Documentation ¶
Overview ¶
Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.
Index ¶
- Constants
- Variables
- func IsCancelError(err error) bool
- func IsFatalEHError(err error) bool
- func IsNotAllowedError(err error) bool
- func IsOwnershipLostError(err error) bool
- func IsQuickRecoveryError(err error) bool
- func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, ...) error
- func NewErrNonRetriable(message string) error
- func NewRPCLink(ctx context.Context, args RPCLinkArgs) (amqpwrap.RPCLink, error)
- func TransformError(err error) error
- type AMQPLink
- type AMQPReceiver
- type AMQPReceiverCloser
- type AMQPSender
- type AMQPSenderCloser
- type Closeable
- type FakeAMQPReceiver
- func (r *FakeAMQPReceiver) Close(ctx context.Context) error
- func (r *FakeAMQPReceiver) Credits() uint32
- func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error
- func (r *FakeAMQPReceiver) LinkName() string
- func (r *FakeAMQPReceiver) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)
- type FakeAMQPSender
- type FakeAMQPSession
- func (sess *FakeAMQPSession) Close(ctx context.Context) error
- func (sess *FakeAMQPSession) NewReceiver(ctx context.Context, source string, partitionID string, ...) (amqpwrap.AMQPReceiverCloser, error)
- func (sess *FakeAMQPSession) NewSender(ctx context.Context, target string, partitionID string, ...) (AMQPSenderCloser, error)
- type FakeNSForPartClient
- func (ns *FakeNSForPartClient) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
- func (ns *FakeNSForPartClient) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
- func (ns *FakeNSForPartClient) Recover(ctx context.Context, clientRevision uint64) error
- type LinkRetrier
- type LinkWithID
- type Links
- func (l *Links[LinkT]) Close(ctx context.Context) error
- func (l *Links[LinkT]) GetLink(ctx context.Context, partitionID string) (LinkWithID[LinkT], error)
- func (l *Links[LinkT]) GetManagementLink(ctx context.Context) (LinkWithID[amqpwrap.RPCLink], error)
- func (l *Links[LinkT]) Retry(ctx context.Context, eventName azlog.Event, operation string, ...) error
- func (l *Links[LinkT]) RetryManagement(ctx context.Context, eventName azlog.Event, operation string, ...) error
- type LinksForPartitionClient
- type Namespace
- func (ns *Namespace) Check() error
- func (ns *Namespace) Close(ctx context.Context, permanently bool) error
- func (ns *Namespace) GetAMQPClientImpl(ctx context.Context) (amqpwrap.AMQPClient, uint64, error)
- func (ns *Namespace) GetEntityAudience(entityPath string) string
- func (ns *Namespace) GetHTTPSHostURI() string
- func (ns *Namespace) GetTokenForEntity(eventHub string) (*auth.Token, error)
- func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
- func (ns *Namespace) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
- func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (amqpwrap.RPCLink, uint64, error)
- func (ns *Namespace) Recover(ctx context.Context, theirConnID uint64) error
- type NamespaceForAMQPLinks
- type NamespaceForManagementOps
- type NamespaceForProducerOrConsumer
- type NamespaceOption
- func NamespaceWithConnectionString(connStr string) NamespaceOption
- func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption
- func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
- func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
- func NamespaceWithUserAgent(userAgent string) NamespaceOption
- func NamespaceWithWebSocket(...) NamespaceOption
- type NamespaceWithNewAMQPLinks
- type NewLinksFn
- type RPCError
- type RPCLinkArgs
- type RPCLinkOption
- type RecoveryKind
- type RetryCallback
Constants ¶
const Version = "v1.2.3"
Version is the semantic version number
Variables ¶
var ErrClientClosed = NewErrNonRetriable("client has been closed by user")
var RPCLinkClosedErr = errors.New("rpc link closed")
Functions ¶
func IsCancelError ¶
func IsFatalEHError ¶
func IsNotAllowedError ¶ added in v0.6.0
func IsOwnershipLostError ¶ added in v0.2.0
func IsQuickRecoveryError ¶ added in v0.1.1
func NegotiateClaim ¶
func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, provider auth.TokenProvider) error
NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience
func NewErrNonRetriable ¶
func NewRPCLink ¶
NewRPCLink will build a new request response link
func TransformError ¶
TransformError will create a proper error type that users can potentially inspect. If the error is actionable then it'll be of type exported.Error which has a 'Code' field that can be used programatically. If it's not actionable or if it's nil it'll just be returned.
Types ¶
type AMQPReceiver ¶
type AMQPReceiver = amqpwrap.AMQPReceiver
type AMQPReceiverCloser ¶
type AMQPReceiverCloser = amqpwrap.AMQPReceiverCloser
type AMQPSender ¶
type AMQPSender = amqpwrap.AMQPSender
type AMQPSenderCloser ¶
type AMQPSenderCloser = amqpwrap.AMQPSenderCloser
type Closeable ¶
Closeable is implemented by pretty much any AMQP link/client including our own higher level Receiver/Sender.
type FakeAMQPReceiver ¶ added in v0.2.0
type FakeAMQPReceiver struct { amqpwrap.AMQPReceiverCloser // ActiveCredits are incremented and decremented by IssueCredit and Receive. ActiveCredits int32 // IssuedCredit just accumulates, so we can get an idea of how many credits we issued overall. IssuedCredit []uint32 // CreditsSetFromOptions is similar to issuedCredit, but only tracks credits added in via the LinkOptions.Credit // field (ie, enabling prefetch). CreditsSetFromOptions int32 // ManualCreditsSetFromOptions is the value of the LinkOptions.ManualCredits value. ManualCreditsSetFromOptions bool Messages []*amqp.Message NameForLink string CloseCalled int CloseError error }
func (*FakeAMQPReceiver) Close ¶ added in v0.2.0
func (r *FakeAMQPReceiver) Close(ctx context.Context) error
func (*FakeAMQPReceiver) Credits ¶ added in v0.2.0
func (r *FakeAMQPReceiver) Credits() uint32
func (*FakeAMQPReceiver) IssueCredit ¶ added in v0.2.0
func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error
func (*FakeAMQPReceiver) LinkName ¶ added in v0.2.0
func (r *FakeAMQPReceiver) LinkName() string
type FakeAMQPSender ¶ added in v0.2.0
type FakeAMQPSender struct { amqpwrap.AMQPSenderCloser CloseCalled int CloseError error }
type FakeAMQPSession ¶ added in v0.2.0
type FakeAMQPSession struct { amqpwrap.AMQPSession NS *FakeNSForPartClient CloseCalled int }
func (*FakeAMQPSession) Close ¶ added in v0.2.0
func (sess *FakeAMQPSession) Close(ctx context.Context) error
func (*FakeAMQPSession) NewReceiver ¶ added in v0.2.0
func (sess *FakeAMQPSession) NewReceiver(ctx context.Context, source string, partitionID string, opts *amqp.ReceiverOptions) (amqpwrap.AMQPReceiverCloser, error)
func (*FakeAMQPSession) NewSender ¶ added in v0.2.0
func (sess *FakeAMQPSession) NewSender(ctx context.Context, target string, partitionID string, opts *amqp.SenderOptions) (AMQPSenderCloser, error)
type FakeNSForPartClient ¶ added in v0.2.0
type FakeNSForPartClient struct { NamespaceForAMQPLinks Receiver *FakeAMQPReceiver NewReceiverErr error NewReceiverCalled int Sender *FakeAMQPSender NewSenderErr error NewSenderCalled int RecoverFn func(ctx context.Context, clientRevision uint64) error }
func (*FakeNSForPartClient) NegotiateClaim ¶ added in v0.2.0
func (ns *FakeNSForPartClient) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
func (*FakeNSForPartClient) NewAMQPSession ¶ added in v0.2.0
func (ns *FakeNSForPartClient) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
type LinkRetrier ¶ added in v1.0.1
type LinkRetrier[LinkT AMQPLink] struct { // GetLink is set to [Links.GetLink] GetLink func(ctx context.Context, partitionID string) (LinkWithID[LinkT], error) // CloseLink is set to [Links.closePartitionLinkIfMatch] CloseLink func(ctx context.Context, partitionID string, linkName string) error // NSRecover is set to [Namespace.Recover] NSRecover func(ctx context.Context, connID uint64) error }
func (LinkRetrier[LinkT]) RecoverIfNeeded ¶ added in v1.0.1
func (l LinkRetrier[LinkT]) RecoverIfNeeded(ctx context.Context, err error) error
RecoverIfNeeded will check the error and pick the correct minimal recovery pattern (none, link only, connection and link, etc..) NOTE: if 'ctx' is cancelled this function will still close out all the connections/links involved.
func (LinkRetrier[LinkT]) Retry ¶ added in v1.0.1
func (l LinkRetrier[LinkT]) Retry(ctx context.Context, eventName azlog.Event, operation string, partitionID string, retryOptions exported.RetryOptions, fn RetryCallback[LinkT]) error
Retry runs the fn argument in a loop, respecting retry counts. If connection/link failures occur it also takes care of running recovery logic to bring them back, or return an appropriate error if retries are exhausted.
type LinkWithID ¶
type LinkWithID[LinkT AMQPLink] interface { ConnID() uint64 Link() LinkT PartitionID() string Close(ctx context.Context) error String() string }
LinkWithID is a readonly interface over the top of a linkState.
type Links ¶
type Links[LinkT AMQPLink] struct { // contains filtered or unexported fields }
func NewLinks ¶
func NewLinks[LinkT AMQPLink](ns NamespaceForAMQPLinks, managementPath string, entityPathFn func(partitionID string) string, newLinkFn NewLinksFn[LinkT]) *Links[LinkT]
func (*Links[LinkT]) GetManagementLink ¶
type LinksForPartitionClient ¶ added in v0.1.1
type LinksForPartitionClient[LinkT AMQPLink] interface { // Retry is [Links.Retry] Retry(ctx context.Context, eventName azlog.Event, operation string, partitionID string, retryOptions exported.RetryOptions, fn func(ctx context.Context, lwid LinkWithID[LinkT]) error) error // Close is [Links.Close] Close(ctx context.Context) error }
LinksForPartitionClient are the functions that the PartitionClient uses within Links[T] (for unit testing only)
type Namespace ¶
type Namespace struct { FQDN string TokenProvider *sbauth.TokenProvider // NOTE: exported only so it can be checked in a test RetryOptions exported.RetryOptions // contains filtered or unexported fields }
Namespace is an abstraction over an amqp.Client, allowing us to hold onto a single instance of a connection per client..
func NewNamespace ¶
func NewNamespace(opts ...NamespaceOption) (*Namespace, error)
NewNamespace creates a new namespace configured through NamespaceOption(s)
func (*Namespace) Check ¶
Check returns an error if the namespace cannot be used (ie, closed permanently), or nil otherwise.
func (*Namespace) GetAMQPClientImpl ¶
func (*Namespace) GetEntityAudience ¶
func (*Namespace) GetHTTPSHostURI ¶
func (*Namespace) GetTokenForEntity ¶
func (*Namespace) NegotiateClaim ¶
func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
negotiateClaim performs initial authentication and starts periodic refresh of credentials. the returned func is to cancel() the refresh goroutine.
func (*Namespace) NewAMQPSession ¶
NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client. Returns a closeable AMQP session and the current client revision.
func (*Namespace) NewRPCLink ¶
type NamespaceForAMQPLinks ¶
type NamespaceForAMQPLinks interface { NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error) NewRPCLink(ctx context.Context, managementPath string) (amqpwrap.RPCLink, uint64, error) GetEntityAudience(entityPath string) string // Recover destroys the currently held AMQP connection and recreates it, if needed. // // NOTE: cancelling the context only cancels the initialization of a new AMQP // connection - the previous connection is always closed. Recover(ctx context.Context, clientRevision uint64) error Close(ctx context.Context, permanently bool) error }
NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.
type NamespaceForManagementOps ¶
type NamespaceForManagementOps interface { NamespaceForAMQPLinks GetTokenForEntity(eventHub string) (*auth.Token, error) }
type NamespaceForProducerOrConsumer ¶
type NamespaceForProducerOrConsumer = NamespaceForManagementOps
TODO: might just consolidate.
type NamespaceOption ¶
NamespaceOption provides structure for configuring a new Event Hub namespace
func NamespaceWithConnectionString ¶
func NamespaceWithConnectionString(connStr string) NamespaceOption
NamespaceWithConnectionString configures a namespace with the information provided in a Event Hub connection string
func NamespaceWithRetryOptions ¶
func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption
func NamespaceWithTLSConfig ¶
func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
NamespaceWithTLSConfig appends to the TLS config.
func NamespaceWithTokenCredential ¶
func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
NamespaceWithTokenCredential sets the token provider on the namespace fullyQualifiedNamespace is the Event Hub namespace name (ex: myservicebus.servicebus.windows.net)
func NamespaceWithUserAgent ¶
func NamespaceWithUserAgent(userAgent string) NamespaceOption
NamespaceWithUserAgent appends to the root user-agent value.
func NamespaceWithWebSocket ¶
func NamespaceWithWebSocket(newWebSocketConn func(ctx context.Context, args exported.WebSocketConnParams) (net.Conn, error)) NamespaceOption
NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://
type NamespaceWithNewAMQPLinks ¶
type NamespaceWithNewAMQPLinks interface {
Check() error
}
NamespaceWithNewAMQPLinks is the Namespace surface for consumers of AMQPLinks.
type NewLinksFn ¶
type RPCError ¶
type RPCError struct { Resp *amqpwrap.RPCResponse Message string }
RPCError is an error from an RPCLink. RPCLinks are used for communication with the $management and $cbs links.
type RPCLinkArgs ¶
type RPCLinkArgs struct { Client amqpwrap.AMQPClient Address string LogEvent azlog.Event }
type RPCLinkOption ¶
type RPCLinkOption func(link *rpcLink) error
RPCLinkOption provides a way to customize the construction of a Link
type RecoveryKind ¶
type RecoveryKind string
RecoveryKind dictates what kind of recovery is possible. Used with GetRecoveryKind().
const ( RecoveryKindNone RecoveryKind = "" RecoveryKindFatal RecoveryKind = "fatal" RecoveryKindLink RecoveryKind = "link" RecoveryKindConn RecoveryKind = "connection" )
func GetRecoveryKind ¶
func GetRecoveryKind(err error) RecoveryKind
GetRecoveryKind determines the recovery type for non-session based links.
type RetryCallback ¶ added in v1.0.1
type RetryCallback[LinkT AMQPLink] func(ctx context.Context, lwid LinkWithID[LinkT]) error
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
|
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types. |
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
|
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus. |
Package mock is a generated GoMock package.
|
Package mock is a generated GoMock package. |