internal

package
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2022 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.

Index

Constants

View Source
const (
	// Link/connection creation
	EventConn = "azsb.Conn"

	// authentication/claims negotiation
	EventAuth = "azsb.Auth"

	// receiver operations
	EventReceiver = "azsb.Receiver"

	// mgmt link
	EventMgmtLink = "azsb.Mgmt"

	// internal operations
	EventRetry = utils.EventRetry
)
View Source
const RecoveryKindConn recoveryKind = "connection"
View Source
const RecoveryKindFatal recoveryKind = "fatal"
View Source
const RecoveryKindLink recoveryKind = "link"
View Source
const RecoveryKindNone recoveryKind = ""
View Source
const Version = "v0.3.4"

TODO: this should move into a proper file. Need to resolve some interdependency issues between the public and internal packages first. Version is the semantic version number

Variables

This section is empty.

Functions

func CancelScheduledMessages added in v0.3.4

func CancelScheduledMessages(ctx context.Context, rpcLink RPCLink, seq []int64) error

CancelScheduledMessages allows for removal of messages that have been handed to the Service Bus broker for later delivery, but have not yet ben enqueued.

func GetRecoveryKind added in v0.3.3

func GetRecoveryKind(err error) recoveryKind

func GetSessionState added in v0.3.4

func GetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string) ([]byte, error)

GetSessionState retrieves state associated with the session.

func IsCancelError

func IsCancelError(err error) bool

func IsDrainingError

func IsDrainingError(err error) bool

func IsErrNotFound

func IsErrNotFound(err error) bool

IsErrNotFound returns true if the error argument is an ErrNotFound type

func IsFatalSBError added in v0.3.4

func IsFatalSBError(err error) bool
func NewRPCLink(conn *amqp.Client, address string, opts ...RPCLinkOption) (*rpcLink, error)

NewLink will build a new request response link

func PeekMessages added in v0.3.4

func PeekMessages(ctx context.Context, rpcLink RPCLink, fromSequenceNumber int64, messageCount int32) ([]*amqp.Message, error)

func ReceiveDeferred added in v0.3.4

func ReceiveDeferred(ctx context.Context, rpcLink RPCLink, mode ReceiveMode, sequenceNumbers []int64) ([]*amqp.Message, error)

func RenewLocks added in v0.3.4

func RenewLocks(ctx context.Context, rpcLink RPCLink, linkName string, lockTokens []amqp.UUID) ([]time.Time, error)

RenewLocks renews the locks in a single 'com.microsoft:renew-lock' operation. NOTE: this function assumes all the messages received on the same link.

func RenewSessionLock added in v0.3.4

func RenewSessionLock(ctx context.Context, rpcLink RPCLink, sessionID string) (time.Time, error)

RenewSessionLocks renews a session lock.

func ScheduleMessages added in v0.3.4

func ScheduleMessages(ctx context.Context, rpcLink RPCLink, enqueueTime time.Time, messages []*amqp.Message) ([]int64, error)

ScheduleMessages will send a batch of messages to a Queue, schedule them to be enqueued, and return the sequence numbers that can be used to cancel each message.

func SendDisposition added in v0.3.4

func SendDisposition(ctx context.Context, rpcLink RPCLink, lockToken *amqp.UUID, state Disposition, propertiesToModify map[string]interface{}) error

SendDisposition allows you settle a message using the management link, rather than via your *amqp.Receiver. Use this if the receiver has been closed/lost or if the message isn't associated with a link (ex: deferred messages).

func SetSessionState added in v0.3.4

func SetSessionState(ctx context.Context, rpcLink RPCLink, sessionID string, state []byte) error

SetSessionState sets the state associated with the session.

Types

type AMQPLinks interface {
	EntityPath() string
	ManagementPath() string

	Audience() string

	// Get will initialize a session and call its link.linkCreator function.
	// If this link has been closed via Close() it will return an non retriable error.
	Get(ctx context.Context) (*LinksWithID, error)

	// Retry will run your callback, recovering links when necessary.
	Retry(ctx context.Context, name string, fn RetryWithLinksFn, o utils.RetryOptions) error

	// RecoverIfNeeded will check if an error requires recovery, and will recover
	// the link or, possibly, the connection.
	RecoverIfNeeded(ctx context.Context, linkID LinkID, err error) error

	// Close will close the the link.
	// If permanent is true the link will not be auto-recreated if Get/Recover
	// are called. All functions will return `ErrLinksClosed`
	Close(ctx context.Context, permanent bool) error

	// ClosedPermanently is true if AMQPLinks.Close(ctx, true) has been called.
	ClosedPermanently() bool
}
func NewAMQPLinks(ns NamespaceForAMQPLinks, entityPath string, createLink CreateLinkFunc) AMQPLinks

NewAMQPLinks creates a session, starts the claim refresher and creates an associated management link for a specific entity path.

type AMQPLinksImpl added in v0.3.4

type AMQPLinksImpl struct {

	// RPCLink lets you interact with the $management link for your entity.
	RPCLink RPCLink

	// these are populated by your `createLinkFunc` when you construct
	// the amqpLinks
	Sender   AMQPSenderCloser
	Receiver AMQPReceiverCloser
	// contains filtered or unexported fields
}

AMQPLinksImpl manages the set of AMQP links (and detritus) typically needed to work

within Service Bus:

- An *goamqp.Sender or *goamqp.Receiver AMQP link (could also be 'both' if needed) - A `$management` link - an *goamqp.Session

State management can be done through Recover (close and reopen), Close (close permanently, return failures) and Get() (retrieve latest version of all AMQPLinksImpl, or create if needed).

func (*AMQPLinksImpl) Audience added in v0.3.4

func (l *AMQPLinksImpl) Audience() string

EntityPath is the audience for the queue/topic/subscription.

func (*AMQPLinksImpl) Close added in v0.3.4

func (l *AMQPLinksImpl) Close(ctx context.Context, permanent bool) error

Close will close the the link permanently. Any further calls to Get()/Recover() to return ErrLinksClosed.

func (*AMQPLinksImpl) ClosedPermanently added in v0.3.4

func (l *AMQPLinksImpl) ClosedPermanently() bool

ClosedPermanently is true if AMQPLinks.Close(ctx, true) has been called.

func (*AMQPLinksImpl) EntityPath added in v0.3.4

func (l *AMQPLinksImpl) EntityPath() string

EntityPath is the full entity path for the queue/topic/subscription.

func (*AMQPLinksImpl) Get added in v0.3.4

func (l *AMQPLinksImpl) Get(ctx context.Context) (*LinksWithID, error)

Get will initialize a session and call its link.linkCreator function. If this link has been closed via Close() it will return an non retriable error.

func (*AMQPLinksImpl) ManagementPath added in v0.3.4

func (links *AMQPLinksImpl) ManagementPath() string

ManagementPath is the management path for the associated entity.

func (*AMQPLinksImpl) RecoverIfNeeded added in v0.3.4

func (links *AMQPLinksImpl) RecoverIfNeeded(ctx context.Context, theirID LinkID, origErr error) error

Recover will recover the links or the connection, depending on the severity of the error.

func (*AMQPLinksImpl) Retry added in v0.3.4

type AMQPReceiver

type AMQPReceiver interface {
	IssueCredit(credit uint32) error
	DrainCredit(ctx context.Context) error
	Receive(ctx context.Context) (*amqp.Message, error)
	Prefetched(ctx context.Context) (*amqp.Message, error)

	// settlement functions
	AcceptMessage(ctx context.Context, msg *amqp.Message) error
	RejectMessage(ctx context.Context, msg *amqp.Message, e *amqp.Error) error
	ReleaseMessage(ctx context.Context, msg *amqp.Message) error
	ModifyMessage(ctx context.Context, msg *amqp.Message, deliveryFailed, undeliverableHere bool, messageAnnotations amqp.Annotations) error

	LinkName() string
	LinkSourceFilterValue(name string) interface{}
}

AMQPReceiver is implemented by *amqp.Receiver

type AMQPReceiverCloser

type AMQPReceiverCloser interface {
	AMQPReceiver
	Close(ctx context.Context) error
}

AMQPReceiver is implemented by *amqp.Receiver

type AMQPSender

type AMQPSender interface {
	Send(ctx context.Context, msg *amqp.Message) error
	MaxMessageSize() uint64
}

AMQPSender is implemented by *amqp.Sender

type AMQPSenderCloser

type AMQPSenderCloser interface {
	AMQPSender
	Close(ctx context.Context) error
}

AMQPSenderCloser is implemented by *amqp.Sender

type AMQPSession

type AMQPSession interface {
	NewReceiver(opts ...amqp.LinkOption) (*amqp.Receiver, error)
	NewSender(opts ...amqp.LinkOption) (*amqp.Sender, error)
}

AMQPSession is implemented by *amqp.Session

type AMQPSessionCloser

type AMQPSessionCloser interface {
	AMQPSession
	Close(ctx context.Context) error
}

AMQPSessionCloser is implemented by *amqp.Session

type Closeable

type Closeable interface {
	Close(ctx context.Context) error
}

Closeable is implemented by pretty much any AMQP link/client including our own higher level Receiver/Sender.

type CreateLinkFunc

type CreateLinkFunc func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error)

CreateLinkFunc creates the links, using the given session. Typically you'll only create either an *amqp.Sender or a *amqp.Receiver. AMQPLinks handles it either way.

type Disposition

type Disposition struct {
	Status                DispositionStatus
	LockTokens            []*uuid.UUID
	DeadLetterReason      *string
	DeadLetterDescription *string
}

type DispositionStatus

type DispositionStatus string
const (
	CompletedDisposition DispositionStatus = "completed"
	AbandonedDisposition DispositionStatus = "abandoned"
	SuspendedDisposition DispositionStatus = "suspended"
	DeferredDisposition  DispositionStatus = "defered"
)

type ErrAMQP

type ErrAMQP rpc.Response

ErrAMQP indicates that the server communicated an AMQP error with a particular

func (ErrAMQP) Error

func (e ErrAMQP) Error() string

type ErrConnectionClosed

type ErrConnectionClosed string

ErrConnectionClosed indicates that the connection has been closed.

func (ErrConnectionClosed) Error

func (e ErrConnectionClosed) Error() string

type ErrIncorrectType

type ErrIncorrectType struct {
	Key          string
	ExpectedType reflect.Type
	ActualValue  interface{}
}

ErrIncorrectType indicates that type assertion failed. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.

func NewErrIncorrectType

func NewErrIncorrectType(key string, expected, actual interface{}) ErrIncorrectType

NewErrIncorrectType lets you skip using the `reflect` package. Just provide a variable of the desired type as 'expected'.

func (ErrIncorrectType) Error

func (e ErrIncorrectType) Error() string

type ErrMalformedMessage

type ErrMalformedMessage string

ErrMalformedMessage indicates that a message was expected in the form of []byte was not a []byte. This is likely a bug and should be reported.

func (ErrMalformedMessage) Error

func (e ErrMalformedMessage) Error() string

type ErrMissingField

type ErrMissingField string

ErrMissingField indicates that an expected property was missing from an AMQP message. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.

func (ErrMissingField) Error

func (e ErrMissingField) Error() string

type ErrNoMessages

type ErrNoMessages struct{}

ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.

func (ErrNoMessages) Error

func (e ErrNoMessages) Error() string

type ErrNonRetriable added in v0.3.3

type ErrNonRetriable struct {
	Message string
}

func (ErrNonRetriable) Error added in v0.3.3

func (e ErrNonRetriable) Error() string

type ErrNotFound

type ErrNotFound struct {
	EntityPath string
}

ErrNotFound is returned when an entity is not found (404)

func (ErrNotFound) Error

func (e ErrNotFound) Error() string
type FakeAMQPLinks struct {
	AMQPLinks

	Closed int

	// values to be returned for each `Get` call
	Revision LinkID
	Receiver AMQPReceiver
	Sender   AMQPSender
	RPC      RPCLink
	Err      error
	// contains filtered or unexported fields
}

func (*FakeAMQPLinks) Close

func (l *FakeAMQPLinks) Close(ctx context.Context, permanently bool) error

func (*FakeAMQPLinks) ClosedPermanently

func (l *FakeAMQPLinks) ClosedPermanently() bool

func (*FakeAMQPLinks) Get

func (l *FakeAMQPLinks) Get(ctx context.Context) (*LinksWithID, error)

func (*FakeAMQPLinks) Retry added in v0.3.4

type FakeAMQPReceiver

type FakeAMQPReceiver struct {
	AMQPReceiver
	Closed           int
	Drain            int
	RequestedCredits uint32

	ReceiveResults chan struct {
		M *amqp.Message
		E error
	}
}

func (*FakeAMQPReceiver) Close

func (r *FakeAMQPReceiver) Close(ctx context.Context) error

func (*FakeAMQPReceiver) DrainCredit

func (r *FakeAMQPReceiver) DrainCredit(ctx context.Context) error

func (*FakeAMQPReceiver) IssueCredit added in v0.3.4

func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error

func (*FakeAMQPReceiver) Prefetched added in v0.3.4

func (r *FakeAMQPReceiver) Prefetched(ctx context.Context) (*amqp.Message, error)

func (*FakeAMQPReceiver) Receive added in v0.3.4

func (r *FakeAMQPReceiver) Receive(ctx context.Context) (*amqp.Message, error)

type FakeAMQPSender

type FakeAMQPSender struct {
	Closed int
	AMQPSender
}

func (*FakeAMQPSender) Close

func (s *FakeAMQPSender) Close(ctx context.Context) error

type FakeAMQPSession

type FakeAMQPSession struct {
	AMQPSessionCloser
	// contains filtered or unexported fields
}

func (*FakeAMQPSession) Close

func (s *FakeAMQPSession) Close(ctx context.Context) error

type FakeNS

type FakeNS struct {
	RPCLink   RPCLink
	Session   AMQPSessionCloser
	AMQPLinks *FakeAMQPLinks
	// contains filtered or unexported fields
}

func (*FakeNS) CloseIfNeeded added in v0.3.4

func (ns *FakeNS) CloseIfNeeded(ctx context.Context, clientRevision uint64) error

func (*FakeNS) GetEntityAudience

func (ns *FakeNS) GetEntityAudience(entityPath string) string

func (*FakeNS) NegotiateClaim

func (ns *FakeNS) NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)
func (ns *FakeNS) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks

func (*FakeNS) NewAMQPSession

func (ns *FakeNS) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)
func (ns *FakeNS) NewRPCLink(ctx context.Context, managementPath string) (RPCLink, error)

func (*FakeNS) Recover

func (ns *FakeNS) Recover(ctx context.Context, clientRevision uint64) (bool, error)

type LinkID added in v0.3.4

type LinkID struct {
	// Conn is the ID of the connection we used to create our links.
	Conn uint64

	// Link is the ID of our current link.
	Link uint64
}

LinkID is ID that represent our current link and the client used to create it. These are used when trying to determine what parts need to be recreated when an error occurs, to prevent recovering a connection/link repeatedly. See amqpLinks.RecoverIfNeeded() for usage.

type LinksWithID added in v0.3.4

type LinksWithID struct {
	Sender   AMQPSender
	Receiver AMQPReceiver
	RPC      RPCLink
	ID       LinkID
}

type Namespace

type Namespace struct {
	FQDN          string
	TokenProvider *sbauth.TokenProvider
	// contains filtered or unexported fields
}

Namespace is an abstraction over an amqp.Client, allowing us to hold onto a single instance of a connection per ServiceBusClient.

func NewNamespace

func NewNamespace(opts ...NamespaceOption) (*Namespace, error)

NewNamespace creates a new namespace configured through NamespaceOption(s)

func (*Namespace) Close

func (ns *Namespace) Close(ctx context.Context) error

Close closes the current cached client.

func (*Namespace) GetAMQPClientImpl added in v0.3.4

func (ns *Namespace) GetAMQPClientImpl(ctx context.Context) (*amqp.Client, uint64, error)

func (*Namespace) GetEntityAudience

func (ns *Namespace) GetEntityAudience(entityPath string) string

func (*Namespace) GetHTTPSHostURI

func (ns *Namespace) GetHTTPSHostURI() string

func (*Namespace) NegotiateClaim

func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)

negotiateClaim performs initial authentication and starts periodic refresh of credentials. the returned func is to cancel() the refresh goroutine.

func (ns *Namespace) NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks

NewAMQPLinks creates an AMQPLinks struct, which groups together the commonly needed links for working with Service Bus.

func (*Namespace) NewAMQPSession

func (ns *Namespace) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)

NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client. Returns a closeable AMQP session and the current client revision.

func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (RPCLink, error)

NewRPCLink creates a new amqp-common *rpc.Link with the internally cached *amqp.Client.

func (*Namespace) Recover

func (ns *Namespace) Recover(ctx context.Context, theirConnID uint64) (bool, error)

Recover destroys the currently held AMQP connection and recreates it, if needed. If a new is actually created (rather than just cached) then the returned bool will be true. Any links that were created from the original connection will need to be recreated.

type NamespaceForAMQPLinks interface {
	NegotiateClaim(ctx context.Context, entityPath string) (func() <-chan struct{}, error)
	NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)
	NewRPCLink(ctx context.Context, managementPath string) (RPCLink, error)
	GetEntityAudience(entityPath string) string
	Recover(ctx context.Context, clientRevision uint64) (bool, error)
}

NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.

type NamespaceOption

type NamespaceOption func(h *Namespace) error

NamespaceOption provides structure for configuring a new Service Bus namespace

func NamespaceWithConnectionString

func NamespaceWithConnectionString(connStr string) NamespaceOption

NamespaceWithConnectionString configures a namespace with the information provided in a Service Bus connection string

func NamespaceWithRetryOptions added in v0.3.4

func NamespaceWithRetryOptions(retryOptions utils.RetryOptions) NamespaceOption

func NamespaceWithTLSConfig

func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption

NamespaceWithTLSConfig appends to the TLS config.

func NamespaceWithTokenCredential added in v0.3.4

func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption

NamespaceWithTokenCredential sets the token provider on the namespace fullyQualifiedNamespace is the Service Bus namespace name (ex: myservicebus.servicebus.windows.net)

func NamespaceWithUserAgent

func NamespaceWithUserAgent(userAgent string) NamespaceOption

NamespaceWithUserAgent appends to the root user-agent value.

func NamespaceWithWebSocket

func NamespaceWithWebSocket(newWebSocketConn func(ctx context.Context, args NewWebSocketConnArgs) (net.Conn, error)) NamespaceOption

NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://

type NamespaceWithNewAMQPLinks interface {
	NewAMQPLinks(entityPath string, createLinkFunc CreateLinkFunc) AMQPLinks
}

NamespaceWithNewAMQPLinks is the Namespace surface for consumers of AMQPLinks.

type NewWebSocketConnArgs added in v0.3.2

type NewWebSocketConnArgs struct {

	// Host is the the `wss://<host>` to connect to
	Host string
}

NewWebSocketConnArgs are the arguments to the NewWebSocketConn function you pass if you want to enable websockets.

type RPCLink interface {
	Close(ctx context.Context) error
	RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, error)
}

RPCLink is implemented by *rpc.Link

type RPCLinkOption added in v0.3.4

type RPCLinkOption func(link *rpcLink) error

RPCLinkOption provides a way to customize the construction of a Link

type RPCResponse added in v0.3.4

type RPCResponse struct {
	Code        int
	Description string
	Message     *amqp.Message
}

RPCResponse is the simplified response structure from an RPC like call

type ReceiveMode

type ReceiveMode int

ReceiveMode represents the lock style to use for a reciever - either `PeekLock` or `ReceiveAndDelete`

const (
	// PeekLock will lock messages as they are received and can be settled
	// using the Receiver's (Complete|Abandon|DeadLetter|Defer)Message
	// functions.
	PeekLock ReceiveMode = 0
	// ReceiveAndDelete will delete messages as they are received.
	ReceiveAndDelete ReceiveMode = 1
)

type RetryWithLinksFn added in v0.3.4

type RetryWithLinksFn func(ctx context.Context, lwid *LinksWithID, args *utils.RetryFnArgs) error

type SBErrInfo added in v0.3.4

type SBErrInfo struct {
	RecoveryKind recoveryKind
	// contains filtered or unexported fields
}

func GetSBErrInfo added in v0.3.4

func GetSBErrInfo(err error) *SBErrInfo

GetSBErrInfo wraps the passed in 'err' with a proper error with one of either:

  • `fatalServiceBusError` if no recovery is possible.
  • `serviceBusError` if the error is recoverable. The `recoveryKind` field contains the type of recovery needed.

func (*SBErrInfo) AsError added in v0.3.4

func (sbe *SBErrInfo) AsError() error

func (*SBErrInfo) String added in v0.3.4

func (sbe *SBErrInfo) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL