internal

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2022 License: MIT Imports: 22 Imported by: 0

Documentation

Overview

Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.

Index

Constants

View Source
const (
	// RPCResponseCodeLockLost comes back if you lose a message lock _or_ a session lock.
	// (NOTE: this is the one HTTP code that doesn't make intuitive sense. For all others I've just
	// used the HTTP status code instead.
	RPCResponseCodeLockLost = http.StatusGone
)

Response codes that come back over the "RPC" style links like cbs or management.

View Source
const Version = "v0.1.0"

Version is the semantic version number

Variables

View Source
var ErrClientClosed = NewErrNonRetriable("client has been closed by user")

Functions

func IsCancelError

func IsCancelError(err error) bool

func IsDetachError

func IsDetachError(err error) bool

func IsDrainingError

func IsDrainingError(err error) bool

func IsErrNotFound

func IsErrNotFound(err error) bool

IsErrNotFound returns true if the error argument is an ErrNotFound type

func IsFatalEHError

func IsFatalEHError(err error) bool

func NegotiateClaim

func NegotiateClaim(ctx context.Context, audience string, conn amqpwrap.AMQPClient, provider auth.TokenProvider) error

NegotiateClaim attempts to put a token to the $cbs management endpoint to negotiate auth for the given audience

func NewErrNonRetriable

func NewErrNonRetriable(message string) error
func NewRPCLink(ctx context.Context, args RPCLinkArgs) (*rpcLink, error)

NewRPCLink will build a new request response link

func TransformError

func TransformError(err error) error

TransformError will create a proper error type that users can potentially inspect. If the error is actionable then it'll be of type exported.Error which has a 'Code' field that can be used programatically. If it's not actionable or if it's nil it'll just be returned.

Types

type AMQPLink interface {
	Close(ctx context.Context) error
	LinkName() string
}

type AMQPReceiver

type AMQPReceiver = amqpwrap.AMQPReceiver

type AMQPReceiverCloser

type AMQPReceiverCloser = amqpwrap.AMQPReceiverCloser

type AMQPSender

type AMQPSender = amqpwrap.AMQPSender

type AMQPSenderCloser

type AMQPSenderCloser = amqpwrap.AMQPSenderCloser

type Closeable

type Closeable interface {
	Close(ctx context.Context) error
}

Closeable is implemented by pretty much any AMQP link/client including our own higher level Receiver/Sender.

type ErrAMQP

type ErrAMQP RPCResponse

ErrAMQP indicates that the server communicated an AMQP error with a particular

func (ErrAMQP) Error

func (e ErrAMQP) Error() string

type ErrConnectionClosed

type ErrConnectionClosed string

ErrConnectionClosed indicates that the connection has been closed.

func (ErrConnectionClosed) Error

func (e ErrConnectionClosed) Error() string

type ErrIncorrectType

type ErrIncorrectType struct {
	Key          string
	ExpectedType reflect.Type
	ActualValue  interface{}
}

ErrIncorrectType indicates that type assertion failed. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.

func NewErrIncorrectType

func NewErrIncorrectType(key string, expected, actual interface{}) ErrIncorrectType

NewErrIncorrectType lets you skip using the `reflect` package. Just provide a variable of the desired type as 'expected'.

func (ErrIncorrectType) Error

func (e ErrIncorrectType) Error() string

type ErrMalformedMessage

type ErrMalformedMessage string

ErrMalformedMessage indicates that a message was expected in the form of []byte was not a []byte. This is likely a bug and should be reported.

func (ErrMalformedMessage) Error

func (e ErrMalformedMessage) Error() string

type ErrMissingField

type ErrMissingField string

ErrMissingField indicates that an expected property was missing from an AMQP message. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.

func (ErrMissingField) Error

func (e ErrMissingField) Error() string

type ErrNoMessages

type ErrNoMessages struct{}

ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.

func (ErrNoMessages) Error

func (e ErrNoMessages) Error() string

type ErrNotFound

type ErrNotFound struct {
	EntityPath string
}

ErrNotFound is returned when an entity is not found (404)

func (ErrNotFound) Error

func (e ErrNotFound) Error() string

type LinkWithID

type LinkWithID[LinkT AMQPLink] struct {
	// ConnID is an arbitrary (but unique) integer that represents the
	// current connection. This comes back from the Namespace, anytime
	// it hands back a connection.
	ConnID uint64

	// Link will be an amqp.Receiver or amqp.Sender link.
	Link LinkT
}
type Links[LinkT AMQPLink] struct {
	// contains filtered or unexported fields
}
func NewLinks[LinkT AMQPLink](ns NamespaceForAMQPLinks, managementPath string, entityPathFn func(partitionID string) string, newLinkFn NewLinksFn[LinkT]) *Links[LinkT]

func (*Links[LinkT]) Close

func (l *Links[LinkT]) Close(ctx context.Context) error
func (l *Links[LinkT]) CloseLink(ctx context.Context, partitionID string) error
func (l *Links[LinkT]) GetLink(ctx context.Context, partitionID string) (*LinkWithID[LinkT], error)
func (l *Links[LinkT]) GetManagementLink(ctx context.Context) (LinkWithID[RPCLink], error)

func (*Links[LinkT]) RecoverIfNeeded

func (l *Links[LinkT]) RecoverIfNeeded(ctx context.Context, partitionID string, lwid *LinkWithID[LinkT], err error) error

func (*Links[LinkT]) Retry

func (l *Links[LinkT]) Retry(ctx context.Context, eventName log.Event, operation string, partitionID string, retryOptions exported.RetryOptions, fn func(ctx context.Context, lwid LinkWithID[LinkT]) error) error

type Namespace

type Namespace struct {
	FQDN          string
	TokenProvider *sbauth.TokenProvider

	// NOTE: exported only so it can be checked in a test
	RetryOptions exported.RetryOptions
	// contains filtered or unexported fields
}

Namespace is an abstraction over an amqp.Client, allowing us to hold onto a single instance of a connection per client..

func NewNamespace

func NewNamespace(opts ...NamespaceOption) (*Namespace, error)

NewNamespace creates a new namespace configured through NamespaceOption(s)

func (*Namespace) Check

func (ns *Namespace) Check() error

Check returns an error if the namespace cannot be used (ie, closed permanently), or nil otherwise.

func (*Namespace) Close

func (ns *Namespace) Close(ctx context.Context, permanently bool) error

Close closes the current cached client.

func (*Namespace) GetAMQPClientImpl

func (ns *Namespace) GetAMQPClientImpl(ctx context.Context) (amqpwrap.AMQPClient, uint64, error)

func (*Namespace) GetEntityAudience

func (ns *Namespace) GetEntityAudience(entityPath string) string

func (*Namespace) GetHTTPSHostURI

func (ns *Namespace) GetHTTPSHostURI() string

func (*Namespace) GetTokenForEntity

func (ns *Namespace) GetTokenForEntity(eventHub string) (*auth.Token, error)

func (*Namespace) NegotiateClaim

func (ns *Namespace) NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)

negotiateClaim performs initial authentication and starts periodic refresh of credentials. the returned func is to cancel() the refresh goroutine.

func (*Namespace) NewAMQPSession

func (ns *Namespace) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)

NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client. Returns a closeable AMQP session and the current client revision.

func (ns *Namespace) NewRPCLink(ctx context.Context, managementPath string) (RPCLink, uint64, error)

func (*Namespace) Recover

func (ns *Namespace) Recover(ctx context.Context, theirConnID uint64) (bool, error)

Recover destroys the currently held AMQP connection and recreates it, if needed. If a new is actually created (rather than just cached) then the returned bool will be true. Any links that were created from the original connection will need to be recreated.

type NamespaceForAMQPLinks interface {
	NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
	NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
	NewRPCLink(ctx context.Context, managementPath string) (RPCLink, uint64, error)
	GetEntityAudience(entityPath string) string
	Recover(ctx context.Context, clientRevision uint64) (bool, error)
	Close(ctx context.Context, permanently bool) error
}

NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.

type NamespaceForManagementOps

type NamespaceForManagementOps interface {
	NamespaceForAMQPLinks
	GetTokenForEntity(eventHub string) (*auth.Token, error)
}

type NamespaceForProducerOrConsumer

type NamespaceForProducerOrConsumer = NamespaceForManagementOps

TODO: might just consolidate.

type NamespaceOption

type NamespaceOption func(h *Namespace) error

NamespaceOption provides structure for configuring a new Event Hub namespace

func NamespaceWithConnectionString

func NamespaceWithConnectionString(connStr string) NamespaceOption

NamespaceWithConnectionString configures a namespace with the information provided in a Event Hub connection string

func NamespaceWithRetryOptions

func NamespaceWithRetryOptions(retryOptions exported.RetryOptions) NamespaceOption

func NamespaceWithTLSConfig

func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption

NamespaceWithTLSConfig appends to the TLS config.

func NamespaceWithTokenCredential

func NamespaceWithTokenCredential(fullyQualifiedNamespace string, tokenCredential azcore.TokenCredential) NamespaceOption

NamespaceWithTokenCredential sets the token provider on the namespace fullyQualifiedNamespace is the Event Hub namespace name (ex: myservicebus.servicebus.windows.net)

func NamespaceWithUserAgent

func NamespaceWithUserAgent(userAgent string) NamespaceOption

NamespaceWithUserAgent appends to the root user-agent value.

func NamespaceWithWebSocket

func NamespaceWithWebSocket(newWebSocketConn func(ctx context.Context, args exported.NewWebSocketConnArgs) (net.Conn, error)) NamespaceOption

NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://

type NamespaceWithNewAMQPLinks interface {
	Check() error
}

NamespaceWithNewAMQPLinks is the Namespace surface for consumers of AMQPLinks.

type NewLinksFn

type NewLinksFn[LinkT AMQPLink] func(ctx context.Context, session amqpwrap.AMQPSession, entityPath string) (LinkT, error)

type RPCError

type RPCError struct {
	Resp    *RPCResponse
	Message string
}

RPCError is an error from an RPCLink. RPCLinks are used for communication with the $management and $cbs links.

func (RPCError) Error

func (e RPCError) Error() string

Error is a string representation of the error.

func (RPCError) RPCCode

func (e RPCError) RPCCode() int

RPCCode is the code that comes back in the rpc response. This code is intended for programs toreact to programatically.

type RPCLink interface {
	Close(ctx context.Context) error
	RPC(ctx context.Context, msg *amqp.Message) (*RPCResponse, error)
	LinkName() string
}

RPCLink is implemented by *rpc.Link

type RPCLinkArgs

type RPCLinkArgs struct {
	Client   amqpwrap.AMQPClient
	Address  string
	LogEvent azlog.Event
}

type RPCLinkOption

type RPCLinkOption func(link *rpcLink) error

RPCLinkOption provides a way to customize the construction of a Link

type RPCResponse

type RPCResponse struct {
	// Code is the response code - these originate from Service Bus. Some
	// common values are called out below, with the RPCResponseCode* constants.
	Code        int
	Description string
	Message     *amqp.Message
}

RPCResponse is the simplified response structure from an RPC like call

type RecoveryKind

type RecoveryKind string

RecoveryKind dictates what kind of recovery is possible. Used with GetRecoveryKind().

const (
	RecoveryKindNone  RecoveryKind = ""
	RecoveryKindFatal RecoveryKind = "fatal"
	RecoveryKindLink  RecoveryKind = "link"
	RecoveryKindConn  RecoveryKind = "connection"
)

func GetRecoveryKind

func GetRecoveryKind(err error) RecoveryKind

GetRecoveryKind determines the recovery type for non-session based links.

func GetRecoveryKindForSession

func GetRecoveryKindForSession(err error) RecoveryKind

GetRecoveryKindForSession determines the recovery type for session-based links.

Directories

Path Synopsis
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
eh
Package sas provides SAS token functionality which implements TokenProvider from package auth for use with Azure Event Hubs and Service Bus.
Package sas provides SAS token functionality which implements TokenProvider from package auth for use with Azure Event Hubs and Service Bus.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL