Documentation
¶
Index ¶
- Constants
- func IsCancelError(err error) bool
- func IsDrainingError(err error) bool
- func IsErrNotFound(err error) bool
- func IsNonRetriable(err error) bool
- func IsSessionLockedError(err error) bool
- func ShouldRecover(ctx context.Context, err error) bool
- type AMQPLinks
- type AMQPReceiver
- type AMQPReceiverCloser
- type AMQPSender
- type AMQPSenderCloser
- type AMQPSession
- type AMQPSessionCloser
- type BackoffRetrierParams
- type Closeable
- type CreateLinkFunc
- type Disposition
- type DispositionStatus
- type ErrAMQP
- type ErrConnectionClosed
- type ErrIncorrectType
- type ErrMalformedMessage
- type ErrMissingField
- type ErrNoMessages
- type ErrNotFound
- type FakeAMQPLinks
- type FakeAMQPReceiver
- type FakeAMQPSender
- type FakeAMQPSession
- type FakeNS
- func (ns *FakeNS) GetEntityAudience(entityPath string) string
- func (ns *FakeNS) NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)
- func (ns *FakeNS) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
- func (ns *FakeNS) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)
- func (ns *FakeNS) NewMgmtClient(ctx context.Context, links AMQPLinks) (MgmtClient, error)
- func (ns *FakeNS) NewRPCLink(ctx context.Context, managementPath string) (*rpc.Link, error)
- func (ns *FakeNS) Recover(ctx context.Context, clientRevision uint64) error
- type MgmtClient
- type Namespace
- func (ns *Namespace) Close(ctx context.Context) error
- func (ns *Namespace) GetEntityAudience(entityPath string) string
- func (ns *Namespace) GetHTTPSHostURI() string
- func (ns *Namespace) GetHostname() string
- func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)
- func (ns *Namespace) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
- func (ns *Namespace) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)
- func (ns *Namespace) NewMgmtClient(ctx context.Context, l AMQPLinks) (MgmtClient, error)
- func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (*rpc.Link, error)
- func (ns *Namespace) Recover(ctx context.Context, clientRevision uint64) error
- type NamespaceForAMQPLinks
- type NamespaceForMgmtClient
- type NamespaceOption
- func NamespaceWithAzureEnvironment(namespaceName, environmentName string) NamespaceOption
- func NamespaceWithConnectionString(connStr string) NamespaceOption
- func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
- func NamespaceWithUserAgent(userAgent string) NamespaceOption
- func NamespaceWithWebSocket() NamespaceOption
- func NamespacesWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
- type NamespaceWithNewAMQPLinks
- type NonRetriable
- type RPCLink
- type ReceiveMode
- type Retrier
Constants ¶
const Version = "v0.2.0"
TODO: this should move into a proper file. Need to resolve some interdependency issues between the public and internal packages first. Version is the semantic version number
Variables ¶
This section is empty.
Functions ¶
func IsCancelError ¶
func IsDrainingError ¶
func IsErrNotFound ¶
IsErrNotFound returns true if the error argument is an ErrNotFound type
func IsNonRetriable ¶
IsNonRetriable indicates an error is fatal. Typically, this means the connection or link has been closed.
func IsSessionLockedError ¶ added in v0.2.0
IsSessionLockedError checks to see if this is the "you tried to get a session that was already locked" error.
Types ¶
type AMQPLinks ¶
type AMQPLinks interface { EntityPath() string ManagementPath() string Audience() string // Get will initialize a session and call its link.linkCreator function. // If this link has been closed via Close() it will return an non retriable error. Get(ctx context.Context) (AMQPSender, AMQPReceiver, MgmtClient, uint64, error) // RecoverIfNeeded will check if an error requires recovery, and will recover // the link or, possibly, the connection. RecoverIfNeeded(ctx context.Context, linksRevision uint64, err error) error // Close will close the the link. // If permanent is true the link will not be auto-recreated if Get/Recover // are called. All functions will return `ErrLinksClosed` Close(ctx context.Context, permanent bool) error // ClosedPermanently is true if AMQPLinks.Close(ctx, true) has been called. ClosedPermanently() bool }
type AMQPReceiver ¶
type AMQPReceiver interface { IssueCredit(credit uint32) error DrainCredit(ctx context.Context) error Receive(ctx context.Context) (*amqp.Message, error) // settlement functions AcceptMessage(ctx context.Context, msg *amqp.Message) error RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error ReleaseMessage(ctx context.Context, msg *amqp.Message) error ModifyMessage(ctx context.Context, msg *amqp.Message, deliveryFailed, undeliverableHere bool, messageAnnotations amqp.Annotations) error LinkName() string LinkSourceFilterValue(name string) interface{} }
AMQPReceiver is implemented by *amqp.Receiver
type AMQPReceiverCloser ¶
type AMQPReceiverCloser interface { AMQPReceiver Close(ctx context.Context) error }
AMQPReceiver is implemented by *amqp.Receiver
type AMQPSender ¶
type AMQPSender interface { Send(ctx context.Context, msg *amqp.Message) error MaxMessageSize() uint64 }
AMQPSender is implemented by *amqp.Sender
type AMQPSenderCloser ¶
type AMQPSenderCloser interface { AMQPSender Close(ctx context.Context) error }
AMQPSenderCloser is implemented by *amqp.Sender
type AMQPSession ¶
type AMQPSession interface { NewReceiver(opts ...amqp.LinkOption) (*amqp.Receiver, error) NewSender(opts ...amqp.LinkOption) (*amqp.Sender, error) }
AMQPSession is implemented by *amqp.Session
type AMQPSessionCloser ¶
type AMQPSessionCloser interface { AMQPSession Close(ctx context.Context) error }
AMQPSessionCloser is implemented by *amqp.Session
type BackoffRetrierParams ¶
type BackoffRetrierParams struct { // MaxRetries is the maximum number of tries (after the first attempt) // that are allowed. MaxRetries int // Factor is the multiplying factor for each increment step Factor float64 // Jitter eases contention by randomizing backoff steps Jitter bool // Min and Max are the minimum and maximum values of the counter Min, Max time.Duration }
BackoffRetrierParams are parameters for NewBackoffRetrier.
type Closeable ¶
Closeable is implemented by pretty much any AMQP link/client including our own higher level Receiver/Sender.
type CreateLinkFunc ¶
type CreateLinkFunc func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error)
CreateLinkFunc creates the links, using the given session. Typically you'll only create either an *amqp.Sender or a *amqp.Receiver. AMQPLinks handles it either way.
type Disposition ¶
type Disposition struct { Status DispositionStatus LockTokens []*uuid.UUID DeadLetterReason *string DeadLetterDescription *string }
type DispositionStatus ¶
type DispositionStatus string
const ( CompletedDisposition DispositionStatus = "completed" AbandonedDisposition DispositionStatus = "abandoned" SuspendedDisposition DispositionStatus = "suspended" DeferredDisposition DispositionStatus = "defered" )
type ErrConnectionClosed ¶
type ErrConnectionClosed string
ErrConnectionClosed indicates that the connection has been closed.
func (ErrConnectionClosed) Error ¶
func (e ErrConnectionClosed) Error() string
type ErrIncorrectType ¶
ErrIncorrectType indicates that type assertion failed. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func NewErrIncorrectType ¶
func NewErrIncorrectType(key string, expected, actual interface{}) ErrIncorrectType
NewErrIncorrectType lets you skip using the `reflect` package. Just provide a variable of the desired type as 'expected'.
func (ErrIncorrectType) Error ¶
func (e ErrIncorrectType) Error() string
type ErrMalformedMessage ¶
type ErrMalformedMessage string
ErrMalformedMessage indicates that a message was expected in the form of []byte was not a []byte. This is likely a bug and should be reported.
func (ErrMalformedMessage) Error ¶
func (e ErrMalformedMessage) Error() string
type ErrMissingField ¶
type ErrMissingField string
ErrMissingField indicates that an expected property was missing from an AMQP message. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func (ErrMissingField) Error ¶
func (e ErrMissingField) Error() string
type ErrNoMessages ¶
type ErrNoMessages struct{}
ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.
func (ErrNoMessages) Error ¶
func (e ErrNoMessages) Error() string
type ErrNotFound ¶
type ErrNotFound struct {
EntityPath string
}
ErrNotFound is returned when an entity is not found (404)
func (ErrNotFound) Error ¶
func (e ErrNotFound) Error() string
type FakeAMQPLinks ¶
type FakeAMQPLinks struct { AMQPLinks Closed int // values to be returned for each `Get` call Revision uint64 Receiver AMQPReceiver Sender AMQPSender Mgmt MgmtClient Err error // contains filtered or unexported fields }
func (*FakeAMQPLinks) Close ¶
func (l *FakeAMQPLinks) Close(ctx context.Context, permanently bool) error
func (*FakeAMQPLinks) ClosedPermanently ¶
func (l *FakeAMQPLinks) ClosedPermanently() bool
func (*FakeAMQPLinks) Get ¶
func (l *FakeAMQPLinks) Get(ctx context.Context) (AMQPSender, AMQPReceiver, MgmtClient, uint64, error)
type FakeAMQPReceiver ¶
type FakeAMQPReceiver struct { AMQPReceiver Closed int Drain int }
func (*FakeAMQPReceiver) DrainCredit ¶
func (r *FakeAMQPReceiver) DrainCredit(ctx context.Context) error
type FakeAMQPSender ¶
type FakeAMQPSender struct { Closed int AMQPSender }
type FakeAMQPSession ¶
type FakeAMQPSession struct { AMQPSessionCloser // contains filtered or unexported fields }
type FakeNS ¶
type FakeNS struct { MgmtClient MgmtClient RPCLink *rpc.Link Session AMQPSessionCloser AMQPLinks *FakeAMQPLinks // contains filtered or unexported fields }
func (*FakeNS) GetEntityAudience ¶
func (*FakeNS) NegotiateClaim ¶
func (*FakeNS) NewAMQPLinks ¶
func (ns *FakeNS) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
func (*FakeNS) NewAMQPSession ¶
func (*FakeNS) NewMgmtClient ¶
func (*FakeNS) NewRPCLink ¶
type MgmtClient ¶
type MgmtClient interface { Close(ctx context.Context) error SendDisposition(ctx context.Context, lockToken *amqp.UUID, state Disposition) error ReceiveDeferred(ctx context.Context, mode ReceiveMode, sequenceNumbers []int64) ([]*amqp.Message, error) PeekMessages(ctx context.Context, fromSequenceNumber int64, messageCount int32) ([]*amqp.Message, error) ScheduleMessages(ctx context.Context, enqueueTime time.Time, messages ...*amqp.Message) ([]int64, error) CancelScheduled(ctx context.Context, seq ...int64) error RenewLocks(ctx context.Context, linkName string, lockTokens []amqp.UUID) ([]time.Time, error) RenewSessionLock(ctx context.Context, sessionID string) (time.Time, error) GetSessionState(ctx context.Context, sessionID string) ([]byte, error) SetSessionState(ctx context.Context, sessionID string, state []byte) error }
type Namespace ¶
type Namespace struct { Name string Suffix string TokenProvider *sbauth.TokenProvider Environment azure.Environment // contains filtered or unexported fields }
Namespace is an abstraction over an amqp.Client, allowing us to hold onto a single instance of a connection per ServiceBusClient.
func NewNamespace ¶
func NewNamespace(opts ...NamespaceOption) (*Namespace, error)
NewNamespace creates a new namespace configured through NamespaceOption(s)
func (*Namespace) GetEntityAudience ¶
func (*Namespace) GetHTTPSHostURI ¶
func (*Namespace) GetHostname ¶
func (*Namespace) NegotiateClaim ¶
func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)
negotiateClaim performs initial authentication and starts periodic refresh of credentials. the returned func is to cancel() the refresh goroutine.
func (*Namespace) NewAMQPLinks ¶
func (ns *Namespace) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
NewAMQPLinks creates an AMQPLinks struct, which groups together the commonly needed links for working with Service Bus.
func (*Namespace) NewAMQPSession ¶
NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client.
func (*Namespace) NewMgmtClient ¶
NewMgmtClient creates a new management client with the internally cached *amqp.Client.
func (*Namespace) NewRPCLink ¶
NewRPCLink creates a new amqp-common *rpc.Link with the internally cached *amqp.Client.
type NamespaceForAMQPLinks ¶
type NamespaceForAMQPLinks interface { NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error) NewMgmtClient(ctx context.Context, links AMQPLinks) (MgmtClient, error) GetEntityAudience(entityPath string) string Recover(ctx context.Context, clientRevision uint64) error }
NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.
type NamespaceForMgmtClient ¶
type NamespaceForMgmtClient interface {
NewRPCLink(ctx context.Context, managementPath string) (*rpc.Link, error)
}
NamespaceForAMQPLinks is the Namespace surface needed for the *MgmtClient.
type NamespaceOption ¶
NamespaceOption provides structure for configuring a new Service Bus namespace
func NamespaceWithAzureEnvironment ¶
func NamespaceWithAzureEnvironment(namespaceName, environmentName string) NamespaceOption
NamespaceWithAzureEnvironment sets the namespace's Environment, Suffix and ResourceURI parameters according to the Azure Environment defined in "github.com/Azure/go-autorest/autorest/azure" package. This allows to configure the library to be used in the different Azure clouds. environmentName is the name of the cloud as defined in autorest : https://github.com/Azure/go-autorest/blob/b076c1437d051bf4c328db428b70f4fe22ad38b0/autorest/azure/environments.go#L34-L39
func NamespaceWithConnectionString ¶
func NamespaceWithConnectionString(connStr string) NamespaceOption
NamespaceWithConnectionString configures a namespace with the information provided in a Service Bus connection string
func NamespaceWithTLSConfig ¶
func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
NamespaceWithTLSConfig appends to the TLS config.
func NamespaceWithUserAgent ¶
func NamespaceWithUserAgent(userAgent string) NamespaceOption
NamespaceWithUserAgent appends to the root user-agent value.
func NamespaceWithWebSocket ¶
func NamespaceWithWebSocket() NamespaceOption
NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://
func NamespacesWithTokenCredential ¶
func NamespacesWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
NamespacesWithTokenCredential sets the token provider on the namespace fullyQualifiedNamespace is the Service Bus namespace name (ex: myservicebus.servicebus.windows.net)
type NamespaceWithNewAMQPLinks ¶
type NamespaceWithNewAMQPLinks interface {
NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
}
NamespaceWithNewAMQPLinks is the Namespace surface for consumers of AMQPLinks.
type NonRetriable ¶
type NonRetriable interface { error NonRetriable() }
type RPCLink ¶
type RPCLink interface { Close(ctx context.Context) error RetryableRPC(ctx context.Context, times int, delay time.Duration, msg *amqp.Message) (*rpc.Response, error) }
RPCLink is implemented by *rpc.Link
type ReceiveMode ¶
type ReceiveMode int
ReceiveMode represents the lock style to use for a reciever - either `PeekLock` or `ReceiveAndDelete`
const ( // PeekLock will lock messages as they are received and can be settled // using the Receiver's (Complete|Abandon|DeadLetter|Defer)Message // functions. PeekLock ReceiveMode = 0 // ReceiveAndDelete will delete messages as they are received. ReceiveAndDelete ReceiveMode = 1 )
type Retrier ¶
type Retrier interface { // Copies the retrier. Retriers are stateful and must be copied // before starting a set of retries. Copy() Retrier // Exhausted is true if the retries were exhausted. Exhausted() bool // CurrentTry is the current try (0 for the first run before retries) CurrentTry() int // Try marks an attempt to call (first call to Try() does not sleep). // Will return false if the `ctx` is cancelled or if we exhaust our retries. // // rp := RetryPolicy{Backoff:defaultBackoffPolicy, MaxRetries:5} // // for rp.Try(ctx) { // <your code> // } // // if rp.Cancelled() || rp.Exhausted() { // // no more retries needed // } // Try(ctx context.Context) bool }
A retrier that allows you to do a basic for loop and get backoff and retry limits. See `Try` for more details on how to use it.
func NewBackoffRetrier ¶
func NewBackoffRetrier(params BackoffRetrierParams) Retrier
NewBackoffRetrier creates a retrier that allows for configurable min/max times, jitter and maximum retries.