Documentation
¶
Overview ¶
Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.
Index ¶
- Constants
- Variables
- func IsCancelError(err error) bool
- func IsDrainingError(err error) bool
- func IsErrNotFound(err error) bool
- func IsFatalEHError(err error) bool
- func IsNotAllowedError(err error) bool
- func IsOwnershipLostError(err error) bool
- func IsQuickRecoveryError(err error) bool
- func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, ...) error
- func NewErrNonRetriable(message string) error
- func NewRPCLink(ctx context.Context, args RPCLinkArgs) (amqpwrap.RPCLink, error)
- func TransformError(err error) error
- type AMQPLink
- type AMQPReceiver
- type AMQPReceiverCloser
- type AMQPSender
- type AMQPSenderCloser
- type Closeable
- type ErrAMQP
- type ErrConnectionClosed
- type ErrIncorrectType
- type ErrMalformedMessage
- type ErrMissingField
- type ErrNoMessages
- type ErrNotFound
- type FakeAMQPReceiver
- func (r *FakeAMQPReceiver) Close(ctx context.Context) error
- func (r *FakeAMQPReceiver) Credits() uint32
- func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error
- func (r *FakeAMQPReceiver) LinkName() string
- func (r *FakeAMQPReceiver) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)
- type FakeAMQPSender
- type FakeAMQPSession
- func (sess *FakeAMQPSession) Close(ctx context.Context) error
- func (sess *FakeAMQPSession) NewReceiver(ctx context.Context, source string, opts *amqp.ReceiverOptions) (amqpwrap.AMQPReceiverCloser, error)
- func (sess *FakeAMQPSession) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AMQPSenderCloser, error)
- type FakeNSForPartClient
- func (ns *FakeNSForPartClient) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
- func (ns *FakeNSForPartClient) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
- func (ns *FakeNSForPartClient) Recover(ctx context.Context, clientRevision uint64) error
- type LinkWithID
- type Links
- func (l *Links[LinkT]) Close(ctx context.Context) error
- func (l *Links[LinkT]) CloseLink(ctx context.Context, partitionID string) error
- func (l *Links[LinkT]) GetLink(ctx context.Context, partitionID string) (*LinkWithID[LinkT], error)
- func (l *Links[LinkT]) GetManagementLink(ctx context.Context) (LinkWithID[amqpwrap.RPCLink], error)
- func (l *Links[LinkT]) RecoverIfNeeded(ctx context.Context, partitionID string, lwid *LinkWithID[LinkT], err error) error
- func (l *Links[LinkT]) Retry(ctx context.Context, eventName log.Event, operation string, partitionID string, ...) error
- type LinksForPartitionClient
- type Namespace
- func (ns *Namespace) Check() error
- func (ns *Namespace) Close(ctx context.Context, permanently bool) error
- func (ns *Namespace) GetAMQPClientImpl(ctx context.Context) (amqpwrap.AMQPClient, uint64, error)
- func (ns *Namespace) GetEntityAudience(entityPath string) string
- func (ns *Namespace) GetHTTPSHostURI() string
- func (ns *Namespace) GetTokenForEntity(eventHub string) (*auth.Token, error)
- func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
- func (ns *Namespace) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
- func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (amqpwrap.RPCLink, uint64, error)
- func (ns *Namespace) Recover(ctx context.Context, theirConnID uint64) error
- type NamespaceForAMQPLinks
- type NamespaceForManagementOps
- type NamespaceForProducerOrConsumer
- type NamespaceOption
- func NamespaceWithConnectionString(connStr string) NamespaceOption
- func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption
- func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
- func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
- func NamespaceWithUserAgent(userAgent string) NamespaceOption
- func NamespaceWithWebSocket(...) NamespaceOption
- type NamespaceWithNewAMQPLinks
- type NewLinksFn
- type RPCError
- type RPCLinkArgs
- type RPCLinkOption
- type RecoveryKind
Constants ¶
const Version = "v1.0.0"
Version is the semantic version number
Variables ¶
var ErrClientClosed = NewErrNonRetriable("client has been closed by user")
Functions ¶
func IsCancelError ¶
func IsDrainingError ¶
func IsErrNotFound ¶
IsErrNotFound returns true if the error argument is an ErrNotFound type
func IsFatalEHError ¶
func IsNotAllowedError ¶ added in v0.6.0
func IsOwnershipLostError ¶ added in v0.2.0
func IsQuickRecoveryError ¶ added in v0.1.1
func NegotiateClaim ¶
func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, provider auth.TokenProvider) error
NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience
func NewErrNonRetriable ¶
func NewRPCLink ¶
NewRPCLink will build a new request response link
func TransformError ¶
TransformError will create a proper error type that users can potentially inspect. If the error is actionable then it'll be of type exported.Error which has a 'Code' field that can be used programatically. If it's not actionable or if it's nil it'll just be returned.
Types ¶
type AMQPReceiver ¶
type AMQPReceiver = amqpwrap.AMQPReceiver
type AMQPReceiverCloser ¶
type AMQPReceiverCloser = amqpwrap.AMQPReceiverCloser
type AMQPSender ¶
type AMQPSender = amqpwrap.AMQPSender
type AMQPSenderCloser ¶
type AMQPSenderCloser = amqpwrap.AMQPSenderCloser
type Closeable ¶
Closeable is implemented by pretty much any AMQP link/client including our own higher level Receiver/Sender.
type ErrAMQP ¶
type ErrAMQP amqpwrap.RPCResponse
ErrAMQP indicates that the server communicated an AMQP error with a particular
type ErrConnectionClosed ¶
type ErrConnectionClosed string
ErrConnectionClosed indicates that the connection has been closed.
func (ErrConnectionClosed) Error ¶
func (e ErrConnectionClosed) Error() string
type ErrIncorrectType ¶
ErrIncorrectType indicates that type assertion failed. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func NewErrIncorrectType ¶
func NewErrIncorrectType(key string, expected, actual any) ErrIncorrectType
NewErrIncorrectType lets you skip using the `reflect` package. Just provide a variable of the desired type as 'expected'.
func (ErrIncorrectType) Error ¶
func (e ErrIncorrectType) Error() string
type ErrMalformedMessage ¶
type ErrMalformedMessage string
ErrMalformedMessage indicates that a message was expected in the form of []byte was not a []byte. This is likely a bug and should be reported.
func (ErrMalformedMessage) Error ¶
func (e ErrMalformedMessage) Error() string
type ErrMissingField ¶
type ErrMissingField string
ErrMissingField indicates that an expected property was missing from an AMQP message. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func (ErrMissingField) Error ¶
func (e ErrMissingField) Error() string
type ErrNoMessages ¶
type ErrNoMessages struct{}
ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.
func (ErrNoMessages) Error ¶
func (e ErrNoMessages) Error() string
type ErrNotFound ¶
type ErrNotFound struct {
EntityPath string
}
ErrNotFound is returned when an entity is not found (404)
func (ErrNotFound) Error ¶
func (e ErrNotFound) Error() string
type FakeAMQPReceiver ¶ added in v0.2.0
type FakeAMQPReceiver struct { amqpwrap.AMQPReceiverCloser // ActiveCredits are incremented and decremented by IssueCredit and Receive. ActiveCredits int32 // IssuedCredit just accumulates, so we can get an idea of how many credits we issued overall. IssuedCredit []uint32 // CreditsSetFromOptions is similar to issuedCredit, but only tracks credits added in via the LinkOptions.Credit // field (ie, enabling prefetch). CreditsSetFromOptions int32 // ManualCreditsSetFromOptions is the value of the LinkOptions.ManualCredits value. ManualCreditsSetFromOptions bool Messages []*amqp.Message NameForLink string CloseCalled int CloseError error }
func (*FakeAMQPReceiver) Close ¶ added in v0.2.0
func (r *FakeAMQPReceiver) Close(ctx context.Context) error
func (*FakeAMQPReceiver) Credits ¶ added in v0.2.0
func (r *FakeAMQPReceiver) Credits() uint32
func (*FakeAMQPReceiver) IssueCredit ¶ added in v0.2.0
func (r *FakeAMQPReceiver) IssueCredit(credit uint32) error
func (*FakeAMQPReceiver) LinkName ¶ added in v0.2.0
func (r *FakeAMQPReceiver) LinkName() string
type FakeAMQPSender ¶ added in v0.2.0
type FakeAMQPSender struct { amqpwrap.AMQPSenderCloser CloseCalled int CloseError error }
type FakeAMQPSession ¶ added in v0.2.0
type FakeAMQPSession struct { amqpwrap.AMQPSession NS *FakeNSForPartClient CloseCalled int }
func (*FakeAMQPSession) Close ¶ added in v0.2.0
func (sess *FakeAMQPSession) Close(ctx context.Context) error
func (*FakeAMQPSession) NewReceiver ¶ added in v0.2.0
func (sess *FakeAMQPSession) NewReceiver(ctx context.Context, source string, opts *amqp.ReceiverOptions) (amqpwrap.AMQPReceiverCloser, error)
func (*FakeAMQPSession) NewSender ¶ added in v0.2.0
func (sess *FakeAMQPSession) NewSender(ctx context.Context, target string, opts *amqp.SenderOptions) (AMQPSenderCloser, error)
type FakeNSForPartClient ¶ added in v0.2.0
type FakeNSForPartClient struct { NamespaceForAMQPLinks Receiver *FakeAMQPReceiver NewReceiverErr error NewReceiverCalled int Sender *FakeAMQPSender NewSenderErr error NewSenderCalled int RecoverFn func(ctx context.Context, clientRevision uint64) error }
func (*FakeNSForPartClient) NegotiateClaim ¶ added in v0.2.0
func (ns *FakeNSForPartClient) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
func (*FakeNSForPartClient) NewAMQPSession ¶ added in v0.2.0
func (ns *FakeNSForPartClient) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
type LinkWithID ¶
type LinkWithID[LinkT AMQPLink] struct { // ConnID is an arbitrary (but unique) integer that represents the // current connection. This comes back from the Namespace, anytime // it hands back a connection. ConnID uint64 // Link will be an amqp.Receiver or amqp.Sender link. Link LinkT // PartitionID, if available. PartitionID string }
func (*LinkWithID[LinkT]) String ¶ added in v0.4.0
func (lwid *LinkWithID[LinkT]) String() string
type Links ¶
type Links[LinkT AMQPLink] struct { // contains filtered or unexported fields }
func NewLinks ¶
func NewLinks[LinkT AMQPLink](ns NamespaceForAMQPLinks, managementPath string, entityPathFn func(partitionID string) string, newLinkFn NewLinksFn[LinkT]) *Links[LinkT]
func (*Links[LinkT]) GetManagementLink ¶
func (*Links[LinkT]) RecoverIfNeeded ¶
type LinksForPartitionClient ¶ added in v0.1.1
type LinksForPartitionClient[LinkT AMQPLink] interface { RecoverIfNeeded(ctx context.Context, partitionID string, lwid *LinkWithID[LinkT], err error) error Retry(ctx context.Context, eventName log.Event, operation string, partitionID string, retryOptions exported.RetryOptions, fn func(ctx context.Context, lwid LinkWithID[LinkT]) error) error Close(ctx context.Context) error }
LinksForPartitionClient are the functions that the PartitionClient uses within Links[T] (for unit testing only)
type Namespace ¶
type Namespace struct { FQDN string TokenProvider *sbauth.TokenProvider // NOTE: exported only so it can be checked in a test RetryOptions exported.RetryOptions // contains filtered or unexported fields }
Namespace is an abstraction over an amqp.Client, allowing us to hold onto a single instance of a connection per client..
func NewNamespace ¶
func NewNamespace(opts ...NamespaceOption) (*Namespace, error)
NewNamespace creates a new namespace configured through NamespaceOption(s)
func (*Namespace) Check ¶
Check returns an error if the namespace cannot be used (ie, closed permanently), or nil otherwise.
func (*Namespace) GetAMQPClientImpl ¶
func (*Namespace) GetEntityAudience ¶
func (*Namespace) GetHTTPSHostURI ¶
func (*Namespace) GetTokenForEntity ¶
func (*Namespace) NegotiateClaim ¶
func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
negotiateClaim performs initial authentication and starts periodic refresh of credentials. the returned func is to cancel() the refresh goroutine.
func (*Namespace) NewAMQPSession ¶
NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client. Returns a closeable AMQP session and the current client revision.
func (*Namespace) NewRPCLink ¶
type NamespaceForAMQPLinks ¶
type NamespaceForAMQPLinks interface { NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error) NewRPCLink(ctx context.Context, managementPath string) (amqpwrap.RPCLink, uint64, error) GetEntityAudience(entityPath string) string // Recover destroys the currently held AMQP connection and recreates it, if needed. // // NOTE: cancelling the context only cancels the initialization of a new AMQP // connection - the previous connection is always closed. Recover(ctx context.Context, clientRevision uint64) error Close(ctx context.Context, permanently bool) error }
NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.
type NamespaceForManagementOps ¶
type NamespaceForManagementOps interface { NamespaceForAMQPLinks GetTokenForEntity(eventHub string) (*auth.Token, error) }
type NamespaceForProducerOrConsumer ¶
type NamespaceForProducerOrConsumer = NamespaceForManagementOps
TODO: might just consolidate.
type NamespaceOption ¶
NamespaceOption provides structure for configuring a new Event Hub namespace
func NamespaceWithConnectionString ¶
func NamespaceWithConnectionString(connStr string) NamespaceOption
NamespaceWithConnectionString configures a namespace with the information provided in a Event Hub connection string
func NamespaceWithRetryOptions ¶
func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption
func NamespaceWithTLSConfig ¶
func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
NamespaceWithTLSConfig appends to the TLS config.
func NamespaceWithTokenCredential ¶
func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
NamespaceWithTokenCredential sets the token provider on the namespace fullyQualifiedNamespace is the Event Hub namespace name (ex: myservicebus.servicebus.windows.net)
func NamespaceWithUserAgent ¶
func NamespaceWithUserAgent(userAgent string) NamespaceOption
NamespaceWithUserAgent appends to the root user-agent value.
func NamespaceWithWebSocket ¶
func NamespaceWithWebSocket(newWebSocketConn func(ctx context.Context, args exported.WebSocketConnParams) (net.Conn, error)) NamespaceOption
NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://
type NamespaceWithNewAMQPLinks ¶
type NamespaceWithNewAMQPLinks interface {
Check() error
}
NamespaceWithNewAMQPLinks is the Namespace surface for consumers of AMQPLinks.
type NewLinksFn ¶
type RPCError ¶
type RPCError struct { Resp *amqpwrap.RPCResponse Message string }
RPCError is an error from an RPCLink. RPCLinks are used for communication with the $management and $cbs links.
type RPCLinkArgs ¶
type RPCLinkArgs struct { Client amqpwrap.AMQPClient Address string LogEvent azlog.Event }
type RPCLinkOption ¶
type RPCLinkOption func(link *rpcLink) error
RPCLinkOption provides a way to customize the construction of a Link
type RecoveryKind ¶
type RecoveryKind string
RecoveryKind dictates what kind of recovery is possible. Used with GetRecoveryKind().
const ( RecoveryKindNone RecoveryKind = "" RecoveryKindFatal RecoveryKind = "fatal" RecoveryKindLink RecoveryKind = "link" RecoveryKindConn RecoveryKind = "connection" )
func GetRecoveryKind ¶
func GetRecoveryKind(err error) RecoveryKind
GetRecoveryKind determines the recovery type for non-session based links.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
|
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types. |
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
|
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus. |
Package mock is a generated GoMock package.
|
Package mock is a generated GoMock package. |