internal

package
v1.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2024 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.

Index

Constants

View Source
const Version = "v1.2.3"

Version is the semantic version number

Variables

View Source
var ErrClientClosed = NewErrNonRetriable("client has been closed by user")
View Source
var RPCLinkClosedErr = errors.New("rpc link closed")

Functions

func IsCancelError

func IsCancelError(err error) bool

func IsFatalEHError

func IsFatalEHError(err error) bool

func IsNotAllowedError added in v0.6.0

func IsNotAllowedError(err error) bool

func IsOwnershipLostError added in v0.2.0

func IsOwnershipLostError(err error) bool

func IsQuickRecoveryError added in v0.1.1

func IsQuickRecoveryError(err error) bool

func NegotiateClaim

func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, provider auth.TokenProvider) error

NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience

func NewErrNonRetriable

func NewErrNonRetriable(message string) error
func NewRPCLink(ctx context.Context, args RPCLinkArgs) (amqpwrap.RPCLink, error)

NewRPCLink will build a new request response link

func TransformError

func TransformError(err error) error

TransformError will create a proper error type that users can potentially inspect. If the error is actionable then it'll be of type exported.Error which has a 'Code' field that can be used programatically. If it's not actionable or if it's nil it'll just be returned.

Types

type AMQPLink interface {
	Close(ctx context.Context) error
	LinkName() string
}

type AMQPReceiver

type AMQPReceiver = amqpwrap.AMQPReceiver

type AMQPReceiverCloser

type AMQPReceiverCloser = amqpwrap.AMQPReceiverCloser

type AMQPSender

type AMQPSender = amqpwrap.AMQPSender

type AMQPSenderCloser

type AMQPSenderCloser = amqpwrap.AMQPSenderCloser

type Closeable

type Closeable interface {
	Close(ctx context.Context) error
}

Closeable is implemented by pretty much any AMQP link/client including our own higher level Receiver/Sender.

type FakeAMQPReceiver added in v0.2.0

type FakeAMQPReceiver struct {
	amqpwrap.AMQPReceiverCloser

	// ActiveCredits are incremented and decremented by IssueCredit and Receive.
	ActiveCredits int32

	// IssuedCredit just accumulates, so we can get an idea of how many credits we issued overall.
	IssuedCredit []uint32

	// CreditsSetFromOptions is similar to issuedCredit, but only tracks credits added in via the LinkOptions.Credit
	// field (ie, enabling prefetch).
	CreditsSetFromOptions int32

	// ManualCreditsSetFromOptions is the value of the LinkOptions.ManualCredits value.
	ManualCreditsSetFromOptions bool

	Messages []*amqp.Message

	NameForLink string

	CloseCalled int
	CloseError  error
}

func (*FakeAMQPReceiver) Close added in v0.2.0

func (r *FakeAMQPReceiver) Close(ctx context.Context) error

func (*FakeAMQPReceiver) Credits added in v0.2.0

func (r *FakeAMQPReceiver) Credits() uint32

func (*FakeAMQPReceiver) IssueCredit added in v0.2.0

func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error

func (*FakeAMQPReceiver) LinkName added in v0.2.0

func (r *FakeAMQPReceiver) LinkName() string

func (*FakeAMQPReceiver) Receive added in v0.2.0

func (r *FakeAMQPReceiver) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)

type FakeAMQPSender added in v0.2.0

type FakeAMQPSender struct {
	amqpwrap.AMQPSenderCloser
	CloseCalled int
	CloseError  error
}

func (*FakeAMQPSender) Close added in v0.2.0

func (s *FakeAMQPSender) Close(ctx context.Context) error

type FakeAMQPSession added in v0.2.0

type FakeAMQPSession struct {
	amqpwrap.AMQPSession
	NS          *FakeNSForPartClient
	CloseCalled int
}

func (*FakeAMQPSession) Close added in v0.2.0

func (sess *FakeAMQPSession) Close(ctx context.Context) error

func (*FakeAMQPSession) NewReceiver added in v0.2.0

func (sess *FakeAMQPSession) NewReceiver(ctx context.Context, source string, partitionID string, opts *amqp.ReceiverOptions) (amqpwrap.AMQPReceiverCloser, error)

func (*FakeAMQPSession) NewSender added in v0.2.0

func (sess *FakeAMQPSession) NewSender(ctx context.Context, target string, partitionID string, opts *amqp.SenderOptions) (AMQPSenderCloser, error)

type FakeNSForPartClient added in v0.2.0

type FakeNSForPartClient struct {
	NamespaceForAMQPLinks

	Receiver          *FakeAMQPReceiver
	NewReceiverErr    error
	NewReceiverCalled int

	Sender          *FakeAMQPSender
	NewSenderErr    error
	NewSenderCalled int

	RecoverFn func(ctx context.Context, clientRevision uint64) error
}

func (*FakeNSForPartClient) NegotiateClaim added in v0.2.0

func (ns *FakeNSForPartClient) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)

func (*FakeNSForPartClient) NewAMQPSession added in v0.2.0

func (ns *FakeNSForPartClient) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)

func (*FakeNSForPartClient) Recover added in v0.2.0

func (ns *FakeNSForPartClient) Recover(ctx context.Context, clientRevision uint64) error

type LinkRetrier added in v1.0.1

type LinkRetrier[LinkT AMQPLink] struct {
	// GetLink is set to [Links.GetLink]
	GetLink func(ctx context.Context, partitionID string) (LinkWithID[LinkT], error)

	// CloseLink is set to [Links.closePartitionLinkIfMatch]
	CloseLink func(ctx context.Context, partitionID string, linkName string) error

	// NSRecover is set to [Namespace.Recover]
	NSRecover func(ctx context.Context, connID uint64) error
}

func (LinkRetrier[LinkT]) RecoverIfNeeded added in v1.0.1

func (l LinkRetrier[LinkT]) RecoverIfNeeded(ctx context.Context, err error) error

RecoverIfNeeded will check the error and pick the correct minimal recovery pattern (none, link only, connection and link, etc..) NOTE: if 'ctx' is cancelled this function will still close out all the connections/links involved.

func (LinkRetrier[LinkT]) Retry added in v1.0.1

func (l LinkRetrier[LinkT]) Retry(ctx context.Context,
	eventName azlog.Event,
	operation string,
	partitionID string,
	retryOptions exported.RetryOptions,
	fn RetryCallback[LinkT]) error

Retry runs the fn argument in a loop, respecting retry counts. If connection/link failures occur it also takes care of running recovery logic to bring them back, or return an appropriate error if retries are exhausted.

type LinkWithID

type LinkWithID[LinkT AMQPLink] interface {
	ConnID() uint64
	Link() LinkT
	PartitionID() string
	Close(ctx context.Context) error
	String() string
}

LinkWithID is a readonly interface over the top of a linkState.

type Links[LinkT AMQPLink] struct {
	// contains filtered or unexported fields
}
func NewLinks[LinkT AMQPLink](ns NamespaceForAMQPLinks, managementPath string, entityPathFn func(partitionID string) string, newLinkFn NewLinksFn[LinkT]) *Links[LinkT]

func (*Links[LinkT]) Close

func (l *Links[LinkT]) Close(ctx context.Context) error
func (l *Links[LinkT]) GetLink(ctx context.Context, partitionID string) (LinkWithID[LinkT], error)
func (l *Links[LinkT]) GetManagementLink(ctx context.Context) (LinkWithID[amqpwrap.RPCLink], error)

func (*Links[LinkT]) Retry

func (l *Links[LinkT]) Retry(ctx context.Context, eventName azlog.Event, operation string, partitionID string, retryOptions exported.RetryOptions, fn func(ctx context.Context, lwid LinkWithID[LinkT]) error) error

func (*Links[LinkT]) RetryManagement added in v1.0.1

func (l *Links[LinkT]) RetryManagement(ctx context.Context, eventName azlog.Event, operation string, retryOptions exported.RetryOptions, fn func(ctx context.Context, lwid LinkWithID[amqpwrap.RPCLink]) error) error

type LinksForPartitionClient added in v0.1.1

type LinksForPartitionClient[LinkT AMQPLink] interface {
	// Retry is [Links.Retry]
	Retry(ctx context.Context, eventName azlog.Event, operation string, partitionID string, retryOptions exported.RetryOptions, fn func(ctx context.Context, lwid LinkWithID[LinkT]) error) error

	// Close is [Links.Close]
	Close(ctx context.Context) error
}

LinksForPartitionClient are the functions that the PartitionClient uses within Links[T] (for unit testing only)

type Namespace

type Namespace struct {
	FQDN          string
	TokenProvider *sbauth.TokenProvider

	// NOTE: exported only so it can be checked in a test
	RetryOptions exported.RetryOptions
	// contains filtered or unexported fields
}

Namespace is an abstraction over an amqp.Client, allowing us to hold onto a single instance of a connection per client..

func NewNamespace

func NewNamespace(opts ...NamespaceOption) (*Namespace, error)

NewNamespace creates a new namespace configured through NamespaceOption(s)

func (*Namespace) Check

func (ns *Namespace) Check() error

Check returns an error if the namespace cannot be used (ie, closed permanently), or nil otherwise.

func (*Namespace) Close

func (ns *Namespace) Close(ctx context.Context, permanently bool) error

Close closes the current cached client.

func (*Namespace) GetAMQPClientImpl

func (ns *Namespace) GetAMQPClientImpl(ctx context.Context) (amqpwrap.AMQPClient, uint64, error)

func (*Namespace) GetEntityAudience

func (ns *Namespace) GetEntityAudience(entityPath string) string

func (*Namespace) GetHTTPSHostURI

func (ns *Namespace) GetHTTPSHostURI() string

func (*Namespace) GetTokenForEntity

func (ns *Namespace) GetTokenForEntity(eventHub string) (*auth.Token, error)

func (*Namespace) NegotiateClaim

func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)

negotiateClaim performs initial authentication and starts periodic refresh of credentials. the returned func is to cancel() the refresh goroutine.

func (*Namespace) NewAMQPSession

func (ns *Namespace) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)

NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client. Returns a closeable AMQP session and the current client revision.

func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (amqpwrap.RPCLink, uint64, error)

func (*Namespace) Recover

func (ns *Namespace) Recover(ctx context.Context, theirConnID uint64) error

Recover destroys the currently held AMQP connection and recreates it, if needed.

NOTE: cancelling the context only cancels the initialization of a new AMQP connection - the previous connection is always closed.

type NamespaceForAMQPLinks interface {
	NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
	NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
	NewRPCLink(ctx context.Context, managementPath string) (amqpwrap.RPCLink, uint64, error)
	GetEntityAudience(entityPath string) string

	// Recover destroys the currently held AMQP connection and recreates it, if needed.
	//
	// NOTE: cancelling the context only cancels the initialization of a new AMQP
	// connection - the previous connection is always closed.
	Recover(ctx context.Context, clientRevision uint64) error

	Close(ctx context.Context, permanently bool) error
}

NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.

type NamespaceForManagementOps

type NamespaceForManagementOps interface {
	NamespaceForAMQPLinks
	GetTokenForEntity(eventHub string) (*auth.Token, error)
}

type NamespaceForProducerOrConsumer

type NamespaceForProducerOrConsumer = NamespaceForManagementOps

TODO: might just consolidate.

type NamespaceOption

type NamespaceOption func(h *Namespace) error

NamespaceOption provides structure for configuring a new Event Hub namespace

func NamespaceWithConnectionString

func NamespaceWithConnectionString(connStr string) NamespaceOption

NamespaceWithConnectionString configures a namespace with the information provided in a Event Hub connection string

func NamespaceWithRetryOptions

func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption

func NamespaceWithTLSConfig

func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption

NamespaceWithTLSConfig appends to the TLS config.

func NamespaceWithTokenCredential

func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption

NamespaceWithTokenCredential sets the token provider on the namespace fullyQualifiedNamespace is the Event Hub namespace name (ex: myservicebus.servicebus.windows.net)

func NamespaceWithUserAgent

func NamespaceWithUserAgent(userAgent string) NamespaceOption

NamespaceWithUserAgent appends to the root user-agent value.

func NamespaceWithWebSocket

func NamespaceWithWebSocket(newWebSocketConn func(ctx context.Context, args exported.WebSocketConnParams) (net.Conn, error)) NamespaceOption

NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://

type NamespaceWithNewAMQPLinks interface {
	Check() error
}

NamespaceWithNewAMQPLinks is the Namespace surface for consumers of AMQPLinks.

type NewLinksFn

type NewLinksFn[LinkT AMQPLink] func(ctx context.Context, session amqpwrap.AMQPSession, entityPath string, partitionID string) (LinkT, error)

type RPCError

type RPCError struct {
	Resp    *amqpwrap.RPCResponse
	Message string
}

RPCError is an error from an RPCLink. RPCLinks are used for communication with the $management and $cbs links.

func (RPCError) Error

func (e RPCError) Error() string

Error is a string representation of the error.

func (RPCError) RPCCode

func (e RPCError) RPCCode() int

RPCCode is the code that comes back in the rpc response. This code is intended for programs toreact to programatically.

type RPCLinkArgs

type RPCLinkArgs struct {
	Client   amqpwrap.AMQPClient
	Address  string
	LogEvent azlog.Event
}

type RPCLinkOption

type RPCLinkOption func(link *rpcLink) error

RPCLinkOption provides a way to customize the construction of a Link

type RecoveryKind

type RecoveryKind string

RecoveryKind dictates what kind of recovery is possible. Used with GetRecoveryKind().

const (
	RecoveryKindNone  RecoveryKind = ""
	RecoveryKindFatal RecoveryKind = "fatal"
	RecoveryKindLink  RecoveryKind = "link"
	RecoveryKindConn  RecoveryKind = "connection"
)

func GetRecoveryKind

func GetRecoveryKind(err error) RecoveryKind

GetRecoveryKind determines the recovery type for non-session based links.

type RetryCallback added in v1.0.1

type RetryCallback[LinkT AMQPLink] func(ctx context.Context, lwid LinkWithID[LinkT]) error

Directories

Path Synopsis
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
eh
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL