internal

package
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2022 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// internal operations
	EventRetry = "azsb.Retry"
)
View Source
const RecoveryKindConn recoveryKind = "connection"
View Source
const RecoveryKindFatal recoveryKind = "fatal"
View Source
const RecoveryKindLink recoveryKind = "link"
View Source
const RecoveryKindNone recoveryKind = ""
View Source
const Version = "v0.3.3"

TODO: this should move into a proper file. Need to resolve some interdependency issues between the public and internal packages first. Version is the semantic version number

Variables

This section is empty.

Functions

func GetRecoveryKind added in v0.3.3

func GetRecoveryKind(ctxForLogging context.Context, err error) recoveryKind

func IsCancelError

func IsCancelError(err error) bool

func IsDrainingError

func IsDrainingError(err error) bool

func IsErrNotFound

func IsErrNotFound(err error) bool

IsErrNotFound returns true if the error argument is an ErrNotFound type

func IsNonRetriable

func IsNonRetriable(err error) bool

IsNonRetriable indicates an error is fatal. Typically, this means the connection or link has been closed.

func Retry added in v0.3.3

func Retry(ctx context.Context, name string, fn func(ctx context.Context, args *RetryFnArgs) error, isFatalFn func(err error) bool, o RetryOptions) error

Retry runs a standard retry loop. It executes your passed in fn as the body of the loop. 'isFatal' can be nil, and defaults to just checking that ServiceBusError(err).recoveryKind != recoveryKindNonRetriable. It returns if it exceeds the number of configured retry options or if 'isFatal' returns true.

func ShouldRecover

func ShouldRecover(ctx context.Context, err error) bool

Types

type AMQPLinks interface {
	EntityPath() string
	ManagementPath() string

	Audience() string

	// Get will initialize a session and call its link.linkCreator function.
	// If this link has been closed via Close() it will return an non retriable error.
	Get(ctx context.Context) (AMQPSender, AMQPReceiver, MgmtClient, uint64, error)

	// RecoverIfNeeded will check if an error requires recovery, and will recover
	// the link or, possibly, the connection.
	RecoverIfNeeded(ctx context.Context, linksRevision uint64, err error) error

	// Close will close the the link.
	// If permanent is true the link will not be auto-recreated if Get/Recover
	// are called. All functions will return `ErrLinksClosed`
	Close(ctx context.Context, permanent bool) error

	// ClosedPermanently is true if AMQPLinks.Close(ctx, true) has been called.
	ClosedPermanently() bool
}

type AMQPReceiver

type AMQPReceiver interface {
	IssueCredit(credit uint32) error
	DrainCredit(ctx context.Context) error
	Receive(ctx context.Context) (*amqp.Message, error)
	Prefetched(ctx context.Context) (*amqp.Message, error)

	// settlement functions
	AcceptMessage(ctx context.Context, msg *amqp.Message) error
	RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error
	ReleaseMessage(ctx context.Context, msg *amqp.Message) error
	ModifyMessage(ctx context.Context, msg *amqp.Message, deliveryFailed, undeliverableHere bool, messageAnnotations amqp.Annotations) error

	LinkName() string
	LinkSourceFilterValue(name string) interface{}
}

AMQPReceiver is implemented by *amqp.Receiver

type AMQPReceiverCloser

type AMQPReceiverCloser interface {
	AMQPReceiver
	Close(ctx context.Context) error
}

AMQPReceiver is implemented by *amqp.Receiver

type AMQPSender

type AMQPSender interface {
	Send(ctx context.Context, msg *amqp.Message) error
	MaxMessageSize() uint64
}

AMQPSender is implemented by *amqp.Sender

type AMQPSenderCloser

type AMQPSenderCloser interface {
	AMQPSender
	Close(ctx context.Context) error
}

AMQPSenderCloser is implemented by *amqp.Sender

type AMQPSession

type AMQPSession interface {
	NewReceiver(opts ...amqp.LinkOption) (*amqp.Receiver, error)
	NewSender(opts ...amqp.LinkOption) (*amqp.Sender, error)
}

AMQPSession is implemented by *amqp.Session

type AMQPSessionCloser

type AMQPSessionCloser interface {
	AMQPSession
	Close(ctx context.Context) error
}

AMQPSessionCloser is implemented by *amqp.Session

type BackoffRetrierParams

type BackoffRetrierParams struct {
	// MaxRetries is the maximum number of tries (after the first attempt)
	// that are allowed.
	MaxRetries int
	// Factor is the multiplying factor for each increment step
	Factor float64
	// Jitter eases contention by randomizing backoff steps
	Jitter bool
	// Min and Max are the minimum and maximum values of the counter
	Min, Max time.Duration
}

BackoffRetrierParams are parameters for NewBackoffRetrier.

type Closeable

type Closeable interface {
	Close(ctx context.Context) error
}

Closeable is implemented by pretty much any AMQP link/client including our own higher level Receiver/Sender.

type CreateLinkFunc

type CreateLinkFunc func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error)

CreateLinkFunc creates the links, using the given session. Typically you'll only create either an *amqp.Sender or a *amqp.Receiver. AMQPLinks handles it either way.

type Disposition

type Disposition struct {
	Status                DispositionStatus
	LockTokens            []*uuid.UUID
	DeadLetterReason      *string
	DeadLetterDescription *string
}

type DispositionStatus

type DispositionStatus string
const (
	CompletedDisposition DispositionStatus = "completed"
	AbandonedDisposition DispositionStatus = "abandoned"
	SuspendedDisposition DispositionStatus = "suspended"
	DeferredDisposition  DispositionStatus = "defered"
)

type ErrAMQP

type ErrAMQP rpc.Response

ErrAMQP indicates that the server communicated an AMQP error with a particular

func (ErrAMQP) Error

func (e ErrAMQP) Error() string

type ErrConnectionClosed

type ErrConnectionClosed string

ErrConnectionClosed indicates that the connection has been closed.

func (ErrConnectionClosed) Error

func (e ErrConnectionClosed) Error() string

type ErrIncorrectType

type ErrIncorrectType struct {
	Key          string
	ExpectedType reflect.Type
	ActualValue  interface{}
}

ErrIncorrectType indicates that type assertion failed. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.

func NewErrIncorrectType

func NewErrIncorrectType(key string, expected, actual interface{}) ErrIncorrectType

NewErrIncorrectType lets you skip using the `reflect` package. Just provide a variable of the desired type as 'expected'.

func (ErrIncorrectType) Error

func (e ErrIncorrectType) Error() string

type ErrMalformedMessage

type ErrMalformedMessage string

ErrMalformedMessage indicates that a message was expected in the form of []byte was not a []byte. This is likely a bug and should be reported.

func (ErrMalformedMessage) Error

func (e ErrMalformedMessage) Error() string

type ErrMissingField

type ErrMissingField string

ErrMissingField indicates that an expected property was missing from an AMQP message. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.

func (ErrMissingField) Error

func (e ErrMissingField) Error() string

type ErrNoMessages

type ErrNoMessages struct{}

ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.

func (ErrNoMessages) Error

func (e ErrNoMessages) Error() string

type ErrNonRetriable added in v0.3.3

type ErrNonRetriable struct {
	Message string
}

func (ErrNonRetriable) Error added in v0.3.3

func (e ErrNonRetriable) Error() string

type ErrNotFound

type ErrNotFound struct {
	EntityPath string
}

ErrNotFound is returned when an entity is not found (404)

func (ErrNotFound) Error

func (e ErrNotFound) Error() string
type FakeAMQPLinks struct {
	AMQPLinks

	Closed int

	// values to be returned for each `Get` call
	Revision uint64
	Receiver AMQPReceiver
	Sender   AMQPSender
	Mgmt     MgmtClient
	Err      error
	// contains filtered or unexported fields
}

func (*FakeAMQPLinks) Close

func (l *FakeAMQPLinks) Close(ctx context.Context, permanently bool) error

func (*FakeAMQPLinks) ClosedPermanently

func (l *FakeAMQPLinks) ClosedPermanently() bool

func (*FakeAMQPLinks) Get

type FakeAMQPReceiver

type FakeAMQPReceiver struct {
	AMQPReceiver
	Closed int
	Drain  int
}

func (*FakeAMQPReceiver) Close

func (r *FakeAMQPReceiver) Close(ctx context.Context) error

func (*FakeAMQPReceiver) DrainCredit

func (r *FakeAMQPReceiver) DrainCredit(ctx context.Context) error

type FakeAMQPSender

type FakeAMQPSender struct {
	Closed int
	AMQPSender
}

func (*FakeAMQPSender) Close

func (s *FakeAMQPSender) Close(ctx context.Context) error

type FakeAMQPSession

type FakeAMQPSession struct {
	AMQPSessionCloser
	// contains filtered or unexported fields
}

func (*FakeAMQPSession) Close

func (s *FakeAMQPSession) Close(ctx context.Context) error

type FakeNS

type FakeNS struct {
	MgmtClient MgmtClient
	RPCLink    *rpc.Link
	Session    AMQPSessionCloser
	AMQPLinks  *FakeAMQPLinks
	// contains filtered or unexported fields
}

func (*FakeNS) GetEntityAudience

func (ns *FakeNS) GetEntityAudience(entityPath string) string

func (*FakeNS) NegotiateClaim

func (ns *FakeNS) NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)
func (ns *FakeNS) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks

func (*FakeNS) NewAMQPSession

func (ns *FakeNS) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)

func (*FakeNS) NewMgmtClient

func (ns *FakeNS) NewMgmtClient(ctx context.Context, links AMQPLinks) (MgmtClient, error)
func (ns *FakeNS) NewRPCLink(ctx context.Context, managementPath string) (*rpc.Link, error)

func (*FakeNS) Recover

func (ns *FakeNS) Recover(ctx context.Context, clientRevision uint64) error

type MgmtClient

type MgmtClient interface {
	Close(ctx context.Context) error
	SendDisposition(ctx context.Context, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]interface{}) error
	ReceiveDeferred(ctx context.Context, mode ReceiveMode, sequenceNumbers []int64) ([]*amqp.Message, error)
	PeekMessages(ctx context.Context, fromSequenceNumber int64, messageCount int32) ([]*amqp.Message, error)

	ScheduleMessages(ctx context.Context, enqueueTime time.Time, messages ...*amqp.Message) ([]int64, error)
	CancelScheduled(ctx context.Context, seq ...int64) error

	RenewLocks(ctx context.Context, linkName string, lockTokens []amqp.UUID) ([]time.Time, error)
	RenewSessionLock(ctx context.Context, sessionID string) (time.Time, error)

	GetSessionState(ctx context.Context, sessionID string) ([]byte, error)
	SetSessionState(ctx context.Context, sessionID string, state []byte) error
}

type Namespace

type Namespace struct {
	FQDN          string
	TokenProvider *sbauth.TokenProvider
	// contains filtered or unexported fields
}

Namespace is an abstraction over an amqp.Client, allowing us to hold onto a single instance of a connection per ServiceBusClient.

func NewNamespace

func NewNamespace(opts ...NamespaceOption) (*Namespace, error)

NewNamespace creates a new namespace configured through NamespaceOption(s)

func (*Namespace) Close

func (ns *Namespace) Close(ctx context.Context) error

Close closes the current cached client.

func (*Namespace) GetEntityAudience

func (ns *Namespace) GetEntityAudience(entityPath string) string

func (*Namespace) GetHTTPSHostURI

func (ns *Namespace) GetHTTPSHostURI() string

func (*Namespace) NegotiateClaim

func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)

negotiateClaim performs initial authentication and starts periodic refresh of credentials. the returned func is to cancel() the refresh goroutine.

func (ns *Namespace) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks

NewAMQPLinks creates an AMQPLinks struct, which groups together the commonly needed links for working with Service Bus.

func (*Namespace) NewAMQPSession

func (ns *Namespace) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)

NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client.

func (*Namespace) NewMgmtClient

func (ns *Namespace) NewMgmtClient(ctx context.Context, l AMQPLinks) (MgmtClient, error)

NewMgmtClient creates a new management client with the internally cached *amqp.Client.

func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (*rpc.Link, error)

NewRPCLink creates a new amqp-common *rpc.Link with the internally cached *amqp.Client.

func (*Namespace) Recover

func (ns *Namespace) Recover(ctx context.Context, clientRevision uint64) error

Recover destroys the currently held client and recreates it. clientRevision being nil will recover without a revision check.

type NamespaceForAMQPLinks interface {
	NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)
	NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)
	NewMgmtClient(ctx context.Context, links AMQPLinks) (MgmtClient, error)
	GetEntityAudience(entityPath string) string
	Recover(ctx context.Context, clientRevision uint64) error
}

NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.

type NamespaceForMgmtClient

type NamespaceForMgmtClient interface {
	NewRPCLink(ctx context.Context, managementPath string) (*rpc.Link, error)
}

NamespaceForAMQPLinks is the Namespace surface needed for the *MgmtClient.

type NamespaceOption

type NamespaceOption func(h *Namespace) error

NamespaceOption provides structure for configuring a new Service Bus namespace

func NamespaceWithConnectionString

func NamespaceWithConnectionString(connStr string) NamespaceOption

NamespaceWithConnectionString configures a namespace with the information provided in a Service Bus connection string

func NamespaceWithTLSConfig

func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption

NamespaceWithTLSConfig appends to the TLS config.

func NamespaceWithUserAgent

func NamespaceWithUserAgent(userAgent string) NamespaceOption

NamespaceWithUserAgent appends to the root user-agent value.

func NamespaceWithWebSocket

func NamespaceWithWebSocket(newWebSocketConn func(ctx context.Context, args NewWebSocketConnArgs) (net.Conn, error)) NamespaceOption

NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://

func NamespacesWithTokenCredential

func NamespacesWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption

NamespacesWithTokenCredential sets the token provider on the namespace fullyQualifiedNamespace is the Service Bus namespace name (ex: myservicebus.servicebus.windows.net)

type NamespaceWithNewAMQPLinks interface {
	NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
}

NamespaceWithNewAMQPLinks is the Namespace surface for consumers of AMQPLinks.

type NewWebSocketConnArgs added in v0.3.2

type NewWebSocketConnArgs struct {

	// Host is the the `wss://<host>` to connect to
	Host string
}

NewWebSocketConnArgs are the arguments to the NewWebSocketConn function you pass if you want to enable websockets.

type NonRetriable

type NonRetriable interface {
	error
	NonRetriable()
}
type RPCLink interface {
	Close(ctx context.Context) error
	RetryableRPC(ctx context.Context, times int, delay time.Duration, msg *amqp.Message) (*rpc.Response, error)
}

RPCLink is implemented by *rpc.Link

type ReceiveMode

type ReceiveMode int

ReceiveMode represents the lock style to use for a reciever - either `PeekLock` or `ReceiveAndDelete`

const (
	// PeekLock will lock messages as they are received and can be settled
	// using the Receiver's (Complete|Abandon|DeadLetter|Defer)Message
	// functions.
	PeekLock ReceiveMode = 0
	// ReceiveAndDelete will delete messages as they are received.
	ReceiveAndDelete ReceiveMode = 1
)

type Retrier

type Retrier interface {
	// Copies the retrier. Retriers are stateful and must be copied
	// before starting a set of retries.
	Copy() Retrier

	// Exhausted is true if the retries were exhausted.
	Exhausted() bool

	// CurrentTry is the current try (0 for the first run before retries)
	CurrentTry() int

	// Try marks an attempt to call (first call to Try() does not sleep).
	// Will return false if the `ctx` is cancelled or if we exhaust our retries.
	//
	//    rp := RetryPolicy{Backoff:defaultBackoffPolicy, MaxRetries:5}
	//
	//    for rp.Try(ctx) {
	//       <your code>
	//    }
	//
	//    if rp.Cancelled() || rp.Exhausted() {
	//       // no more retries needed
	//    }
	//
	Try(ctx context.Context) bool
}

A retrier that allows you to do a basic for loop and get backoff and retry limits. See `Try` for more details on how to use it.

func NewBackoffRetrier

func NewBackoffRetrier(params BackoffRetrierParams) Retrier

NewBackoffRetrier creates a retrier that allows for configurable min/max times, jitter and maximum retries.

type RetryFnArgs added in v0.3.3

type RetryFnArgs struct {
	I int32
	// LastErr is the returned error from the previous loop.
	// If you have potentially expensive
	LastErr error
	// contains filtered or unexported fields
}

func (*RetryFnArgs) ResetAttempts added in v0.3.3

func (rf *RetryFnArgs) ResetAttempts()

ResetAttempts causes the current retry attempt number to be reset in time for the next recovery (should we fail). NOTE: Use of this should be pretty rare, it's really only needed when you have a situation like Receiver.ReceiveMessages() that can recovery but intentionally does not return.

type RetryOptions added in v0.3.3

type RetryOptions struct {
	// MaxRetries specifies the maximum number of attempts a failed operation will be retried
	// before producing an error.
	// The default value is three.  A value less than zero means one try and no retries.
	MaxRetries int32

	// RetryDelay specifies the initial amount of delay to use before retrying an operation.
	// The delay increases exponentially with each retry up to the maximum specified by MaxRetryDelay.
	// The default value is four seconds.  A value less than zero means no delay between retries.
	RetryDelay time.Duration

	// MaxRetryDelay specifies the maximum delay allowed before retrying an operation.
	// Typically the value is greater than or equal to the value specified in RetryDelay.
	// The default Value is 120 seconds.  A value less than zero means there is no cap.
	MaxRetryDelay time.Duration
}

RetryOptions represent the options for retries.

type ServiceBusError added in v0.3.3

type ServiceBusError struct {
	RecoveryKind recoveryKind
	// contains filtered or unexported fields
}

func ToSBE added in v0.3.3

func ToSBE(loggingCtx context.Context, err error) *ServiceBusError

ToSBE wraps the passed in 'err' with a proper error with one of either:

  • `fatalServiceBusError` if no recovery is possible.
  • `serviceBusError` if the error is recoverable. The `recoveryKind` field contains the type of recovery needed.

func (*ServiceBusError) AsError added in v0.3.3

func (sbe *ServiceBusError) AsError() error

func (*ServiceBusError) String added in v0.3.3

func (sbe *ServiceBusError) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL