Documentation ¶
Overview ¶
Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.
Index ¶
- Constants
- Variables
- func IsCancelError(err error) bool
- func IsDrainingError(err error) bool
- func IsErrNotFound(err error) bool
- func IsFatalEHError(err error) bool
- func IsQuickRecoveryError(err error) bool
- func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, ...) error
- func NewErrNonRetriable(message string) error
- func NewRPCLink(ctx context.Context, args RPCLinkArgs) (*rpcLink, error)
- func TransformError(err error) error
- type AMQPLink
- type AMQPReceiver
- type AMQPReceiverCloser
- type AMQPSender
- type AMQPSenderCloser
- type Closeable
- type ErrAMQP
- type ErrConnectionClosed
- type ErrIncorrectType
- type ErrMalformedMessage
- type ErrMissingField
- type ErrNoMessages
- type ErrNotFound
- type LinkWithID
- type Links
- func (l *Links[LinkT]) Close(ctx context.Context) error
- func (l *Links[LinkT]) CloseLink(ctx context.Context, partitionID string) error
- func (l *Links[LinkT]) GetLink(ctx context.Context, partitionID string) (*LinkWithID[LinkT], error)
- func (l *Links[LinkT]) GetManagementLink(ctx context.Context) (LinkWithID[RPCLink], error)
- func (l *Links[LinkT]) RecoverIfNeeded(ctx context.Context, partitionID string, lwid *LinkWithID[LinkT], err error) error
- func (l *Links[LinkT]) Retry(ctx context.Context, eventName log.Event, operation string, partitionID string, ...) error
- type LinksForPartitionClient
- type Namespace
- func (ns *Namespace) Check() error
- func (ns *Namespace) Close(ctx context.Context, permanently bool) error
- func (ns *Namespace) GetAMQPClientImpl(ctx context.Context) (amqpwrap.AMQPClient, uint64, error)
- func (ns *Namespace) GetEntityAudience(entityPath string) string
- func (ns *Namespace) GetHTTPSHostURI() string
- func (ns *Namespace) GetTokenForEntity(eventHub string) (*auth.Token, error)
- func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
- func (ns *Namespace) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
- func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (RPCLink, uint64, error)
- func (ns *Namespace) Recover(ctx context.Context, theirConnID uint64) (bool, error)
- type NamespaceForAMQPLinks
- type NamespaceForManagementOps
- type NamespaceForProducerOrConsumer
- type NamespaceOption
- func NamespaceWithConnectionString(connStr string) NamespaceOption
- func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption
- func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
- func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
- func NamespaceWithUserAgent(userAgent string) NamespaceOption
- func NamespaceWithWebSocket(...) NamespaceOption
- type NamespaceWithNewAMQPLinks
- type NewLinksFn
- type RPCError
- type RPCLink
- type RPCLinkArgs
- type RPCLinkOption
- type RPCResponse
- type RecoveryKind
Constants ¶
const ( // RPCResponseCodeLockLost comes back if you lose a message lock _or_ a session lock. // (NOTE: this is the one HTTP code that doesn't make intuitive sense. For all others I've just // used the HTTP status code instead. RPCResponseCodeLockLost = http.StatusGone )
Response codes that come back over the "RPC" style links like cbs or management.
const Version = "v0.1.1"
Version is the semantic version number
Variables ¶
var ErrClientClosed = NewErrNonRetriable("client has been closed by user")
Functions ¶
func IsCancelError ¶
func IsDrainingError ¶
func IsErrNotFound ¶
IsErrNotFound returns true if the error argument is an ErrNotFound type
func IsFatalEHError ¶
func IsQuickRecoveryError ¶ added in v0.1.1
func NegotiateClaim ¶
func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, provider auth.TokenProvider) error
NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience
func NewErrNonRetriable ¶
func NewRPCLink ¶
func NewRPCLink(ctx context.Context, args RPCLinkArgs) (*rpcLink, error)
NewRPCLink will build a new request response link
func TransformError ¶
TransformError will create a proper error type that users can potentially inspect. If the error is actionable then it'll be of type exported.Error which has a 'Code' field that can be used programatically. If it's not actionable or if it's nil it'll just be returned.
Types ¶
type AMQPReceiver ¶
type AMQPReceiver = amqpwrap.AMQPReceiver
type AMQPReceiverCloser ¶
type AMQPReceiverCloser = amqpwrap.AMQPReceiverCloser
type AMQPSender ¶
type AMQPSender = amqpwrap.AMQPSender
type AMQPSenderCloser ¶
type AMQPSenderCloser = amqpwrap.AMQPSenderCloser
type Closeable ¶
Closeable is implemented by pretty much any AMQP link/client including our own higher level Receiver/Sender.
type ErrAMQP ¶
type ErrAMQP RPCResponse
ErrAMQP indicates that the server communicated an AMQP error with a particular
type ErrConnectionClosed ¶
type ErrConnectionClosed string
ErrConnectionClosed indicates that the connection has been closed.
func (ErrConnectionClosed) Error ¶
func (e ErrConnectionClosed) Error() string
type ErrIncorrectType ¶
ErrIncorrectType indicates that type assertion failed. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func NewErrIncorrectType ¶
func NewErrIncorrectType(key string, expected, actual interface{}) ErrIncorrectType
NewErrIncorrectType lets you skip using the `reflect` package. Just provide a variable of the desired type as 'expected'.
func (ErrIncorrectType) Error ¶
func (e ErrIncorrectType) Error() string
type ErrMalformedMessage ¶
type ErrMalformedMessage string
ErrMalformedMessage indicates that a message was expected in the form of []byte was not a []byte. This is likely a bug and should be reported.
func (ErrMalformedMessage) Error ¶
func (e ErrMalformedMessage) Error() string
type ErrMissingField ¶
type ErrMissingField string
ErrMissingField indicates that an expected property was missing from an AMQP message. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func (ErrMissingField) Error ¶
func (e ErrMissingField) Error() string
type ErrNoMessages ¶
type ErrNoMessages struct{}
ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.
func (ErrNoMessages) Error ¶
func (e ErrNoMessages) Error() string
type ErrNotFound ¶
type ErrNotFound struct {
EntityPath string
}
ErrNotFound is returned when an entity is not found (404)
func (ErrNotFound) Error ¶
func (e ErrNotFound) Error() string
type LinkWithID ¶
type Links ¶
type Links[LinkT AMQPLink] struct { // contains filtered or unexported fields }
func NewLinks ¶
func NewLinks[LinkT AMQPLink](ns NamespaceForAMQPLinks, managementPath string, entityPathFn func(partitionID string) string, newLinkFn NewLinksFn[LinkT]) *Links[LinkT]
func (*Links[LinkT]) GetManagementLink ¶
func (*Links[LinkT]) RecoverIfNeeded ¶
type LinksForPartitionClient ¶ added in v0.1.1
type LinksForPartitionClient[LinkT AMQPLink] interface { RecoverIfNeeded(ctx context.Context, partitionID string, lwid *LinkWithID[LinkT], err error) error Retry(ctx context.Context, eventName log.Event, operation string, partitionID string, retryOptions exported.RetryOptions, fn func(ctx context.Context, lwid LinkWithID[LinkT]) error) error Close(ctx context.Context) error }
LinksForPartitionClient are the functions that the PartitionClient uses within Links[T] (for unit testing only)
type Namespace ¶
type Namespace struct { FQDN string TokenProvider *sbauth.TokenProvider // NOTE: exported only so it can be checked in a test RetryOptions exported.RetryOptions // contains filtered or unexported fields }
Namespace is an abstraction over an amqp.Client, allowing us to hold onto a single instance of a connection per client..
func NewNamespace ¶
func NewNamespace(opts ...NamespaceOption) (*Namespace, error)
NewNamespace creates a new namespace configured through NamespaceOption(s)
func (*Namespace) Check ¶
Check returns an error if the namespace cannot be used (ie, closed permanently), or nil otherwise.
func (*Namespace) GetAMQPClientImpl ¶
func (*Namespace) GetEntityAudience ¶
func (*Namespace) GetHTTPSHostURI ¶
func (*Namespace) GetTokenForEntity ¶
func (*Namespace) NegotiateClaim ¶
func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
negotiateClaim performs initial authentication and starts periodic refresh of credentials. the returned func is to cancel() the refresh goroutine.
func (*Namespace) NewAMQPSession ¶
NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client. Returns a closeable AMQP session and the current client revision.
func (*Namespace) NewRPCLink ¶
type NamespaceForAMQPLinks ¶
type NamespaceForAMQPLinks interface { NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error) NewRPCLink(ctx context.Context, managementPath string) (RPCLink, uint64, error) GetEntityAudience(entityPath string) string Recover(ctx context.Context, clientRevision uint64) (bool, error) Close(ctx context.Context, permanently bool) error }
NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.
type NamespaceForManagementOps ¶
type NamespaceForManagementOps interface { NamespaceForAMQPLinks GetTokenForEntity(eventHub string) (*auth.Token, error) }
type NamespaceForProducerOrConsumer ¶
type NamespaceForProducerOrConsumer = NamespaceForManagementOps
TODO: might just consolidate.
type NamespaceOption ¶
NamespaceOption provides structure for configuring a new Event Hub namespace
func NamespaceWithConnectionString ¶
func NamespaceWithConnectionString(connStr string) NamespaceOption
NamespaceWithConnectionString configures a namespace with the information provided in a Event Hub connection string
func NamespaceWithRetryOptions ¶
func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption
func NamespaceWithTLSConfig ¶
func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
NamespaceWithTLSConfig appends to the TLS config.
func NamespaceWithTokenCredential ¶
func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption
NamespaceWithTokenCredential sets the token provider on the namespace fullyQualifiedNamespace is the Event Hub namespace name (ex: myservicebus.servicebus.windows.net)
func NamespaceWithUserAgent ¶
func NamespaceWithUserAgent(userAgent string) NamespaceOption
NamespaceWithUserAgent appends to the root user-agent value.
func NamespaceWithWebSocket ¶
func NamespaceWithWebSocket(newWebSocketConn func(ctx context.Context, args exported.NewWebSocketConnArgs) (net.Conn, error)) NamespaceOption
NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://
type NamespaceWithNewAMQPLinks ¶
type NamespaceWithNewAMQPLinks interface {
Check() error
}
NamespaceWithNewAMQPLinks is the Namespace surface for consumers of AMQPLinks.
type NewLinksFn ¶
type RPCError ¶
type RPCError struct { Resp *RPCResponse Message string }
RPCError is an error from an RPCLink. RPCLinks are used for communication with the $management and $cbs links.
type RPCLink ¶
type RPCLink interface { Close(ctx context.Context) error RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, error) LinkName() string }
RPCLink is implemented by *rpc.Link
type RPCLinkArgs ¶
type RPCLinkArgs struct { Client amqpwrap.AMQPClient Address string LogEvent azlog.Event }
type RPCLinkOption ¶
type RPCLinkOption func(link *rpcLink) error
RPCLinkOption provides a way to customize the construction of a Link
type RPCResponse ¶
type RPCResponse struct { // Code is the response code - these originate from Service Bus. Some // common values are called out below, with the RPCResponseCode* constants. Code int Description string Message *amqp.Message }
RPCResponse is the simplified response structure from an RPC like call
type RecoveryKind ¶
type RecoveryKind string
RecoveryKind dictates what kind of recovery is possible. Used with GetRecoveryKind().
const ( RecoveryKindNone RecoveryKind = "" RecoveryKindFatal RecoveryKind = "fatal" RecoveryKindLink RecoveryKind = "link" RecoveryKindConn RecoveryKind = "connection" )
func GetRecoveryKind ¶
func GetRecoveryKind(err error) RecoveryKind
GetRecoveryKind determines the recovery type for non-session based links.
func GetRecoveryKindForSession ¶
func GetRecoveryKindForSession(err error) RecoveryKind
GetRecoveryKindForSession determines the recovery type for session-based links.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
|
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types. |
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
|
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus. |
Package sas provides SAS token functionality which implements TokenProvider from package auth for use with Azure Event Hubs and Service Bus.
|
Package sas provides SAS token functionality which implements TokenProvider from package auth for use with Azure Event Hubs and Service Bus. |