rabbit

package
v0.34.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2022 License: Apache-2.0 Imports: 42 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BindingKey    = "x-knative-trigger"
	DLQBindingKey = "x-knative-dlq"
)
View Source
const (
	TriggerLabelKey = "eventing.knative.dev/trigger"
	SourceLabelKey  = "eventing.knative.dev/SourceName"
)
View Source
const (
	BrokerURLSecretKey = "brokerURL"
)

Variables

This section is empty.

Functions

func ChannelConfirm added in v0.34.0

func ChannelConfirm(connection RabbitMQConnectionInterface, channel RabbitMQChannelInterface) error

func ChannelQoS added in v0.34.0

func ChannelQoS(connection RabbitMQConnectionInterface, channel RabbitMQChannelInterface) error

func ConvertMessageToHTTPRequest added in v0.32.0

func ConvertMessageToHTTPRequest(
	ctx context.Context,
	sourceName, namespace, queueName string,
	msg *amqp.Delivery,
	req *nethttp.Request,
	logger *zap.Logger) error

func ConvertToCloudEvent added in v0.32.0

func ConvertToCloudEvent(event *cloudevents.Event, msg *amqp.Delivery, namespace, sourceName, queueName string) error

func Labels added in v0.31.0

Labels generates the labels for a RabbitMQ resource Used by exchanges, queues, and bindings created by broker, trigger, and source controllers

func MakeSecret added in v0.33.0

func MakeSecret(name, typeString, namespace, url string, owner kmeta.OwnerRefable) *corev1.Secret

MakeSecret creates the secret for Broker deployments for Rabbit Broker.

func NewBinding added in v0.31.0

func NewBinding(args *BindingArgs) (*rabbitv1beta1.Binding, error)

func NewBrokerDLXPolicy added in v0.32.0

func NewBrokerDLXPolicy(args *QueueArgs) *rabbitv1beta1.Policy

NewBrokerDLXPolicy configures the broker dead letter exchange for trigger queues that does not have dlx defined

func NewExchange added in v0.31.0

func NewExchange(args *ExchangeArgs) *rabbitv1beta1.Exchange

NewExchange returns an `exchange.rabbitmq.com` object used by trigger, broker, and source reconcilers when used by trigger and broker, exchange properties such as `durable`, autoDelete`, and `type` are hardcoded

func NewPolicy added in v0.31.0

func NewPolicy(args *QueueArgs) *rabbitv1beta1.Policy

func NewQueue added in v0.31.0

func NewQueue(args *QueueArgs) *rabbitv1beta1.Queue

func ReconcileSecret added in v0.33.0

func ReconcileSecret(ctx context.Context, secretLister corev1listers.SecretLister, kubeClientSet kubernetes.Interface, s *corev1.Secret) error

reconcileSecret reconciles the K8s Secret 's'.

func SecretLabels added in v0.33.0

func SecretLabels(resourceName, typeString string) map[string]string

SecretLabels generates the labels present on all resources representing the secret of the given Broker.

func SecretName added in v0.33.0

func SecretName(resourceName, typeString string) string

func Watcher added in v0.34.0

func Watcher(testChannel chan bool, rabbitmqHelper RabbitMQHelper)

Types

type BindingArgs added in v0.31.0

type BindingArgs struct {
	Name                     string
	Namespace                string
	Owner                    metav1.OwnerReference
	RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference
	Vhost                    string
	Source                   string
	Destination              string
	Labels                   map[string]string
	Filters                  map[string]string
	ClusterName              string
}

type DeleteResourceArgs added in v0.34.0

type DeleteResourceArgs struct {
	Kind      interface{}
	Name      string
	Namespace string
	Owner     metav1.Object
}

type ExchangeArgs added in v0.31.0

type ExchangeArgs struct {
	Name                     string
	Namespace                string
	RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference
	RabbitMQURL              *url.URL
	Broker                   *eventingv1.Broker
	Trigger                  *eventingv1.Trigger
	Source                   *v1alpha1.RabbitmqSource
}

ExchangeArgs are the arguments to create a RabbitMQ Exchange.

type Message added in v0.32.0

type Message struct {
	Value       []byte
	Headers     map[string][]byte
	ContentType string
	// contains filtered or unexported fields
}

Message holds a rabbitmq message. this message *can* be read several times safely

func NewMessage added in v0.32.0

func NewMessage(value []byte, contentType string, headers map[string][]byte) *Message

NewMessage returns a binding.Message that holds the provided rabbitmq message components. The returned binding.Message *can* be read several times safely

func NewMessageFromDelivery added in v0.32.0

func NewMessageFromDelivery(sourceName, namespace, queueName string, msg *amqp.Delivery) *Message

NewMessageFromDelivery returns a binding.Message that holds the provided RabbitMQ Message. The returned binding.Message *can* be read several times safely

func (*Message) Finish added in v0.32.0

func (m *Message) Finish(error) error

func (*Message) GetAttribute added in v0.32.0

func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{})

func (*Message) GetExtension added in v0.32.0

func (m *Message) GetExtension(name string) interface{}

func (*Message) ReadBinary added in v0.32.0

func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) (err error)

func (*Message) ReadEncoding added in v0.32.0

func (m *Message) ReadEncoding() binding.Encoding

func (*Message) ReadStructured added in v0.32.0

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error

type QueueArgs added in v0.31.0

type QueueArgs struct {
	Name                     string
	Namespace                string
	QueueName                string
	RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference
	Owner                    metav1.OwnerReference
	Labels                   map[string]string
	DLXName                  *string
	Source                   *v1alpha1.RabbitmqSource
	BrokerUID                string
	QueueType                eventingv1alpha1.QueueType
}

type Rabbit

type Rabbit struct {
	rabbitclientset.Interface
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context) *Rabbit

func (*Rabbit) DeleteResource added in v0.34.0

func (r *Rabbit) DeleteResource(ctx context.Context, args *DeleteResourceArgs) error

func (*Rabbit) RabbitMQURL added in v0.33.0

func (r *Rabbit) RabbitMQURL(ctx context.Context, clusterRef *rabbitv1beta1.RabbitmqClusterReference) (*url.URL, error)

func (*Rabbit) ReconcileBinding

func (r *Rabbit) ReconcileBinding(ctx context.Context, args *BindingArgs) (Result, error)

func (*Rabbit) ReconcileBrokerDLXPolicy added in v0.32.0

func (r *Rabbit) ReconcileBrokerDLXPolicy(ctx context.Context, args *QueueArgs) (Result, error)

func (*Rabbit) ReconcileDLQPolicy added in v0.33.0

func (r *Rabbit) ReconcileDLQPolicy(ctx context.Context, args *QueueArgs) (Result, error)

func (*Rabbit) ReconcileExchange

func (r *Rabbit) ReconcileExchange(ctx context.Context, args *ExchangeArgs) (Result, error)

func (*Rabbit) ReconcileQueue

func (r *Rabbit) ReconcileQueue(ctx context.Context, args *QueueArgs) (Result, error)

type RabbitMQBadConnectionMock added in v0.34.0

type RabbitMQBadConnectionMock struct{}

func (*RabbitMQBadConnectionMock) ChannelWrapper added in v0.34.0

func (*RabbitMQBadConnectionMock) Close added in v0.34.0

func (rm *RabbitMQBadConnectionMock) Close() error

func (*RabbitMQBadConnectionMock) IsClosed added in v0.34.0

func (rm *RabbitMQBadConnectionMock) IsClosed() bool

func (*RabbitMQBadConnectionMock) NotifyClose added in v0.34.0

func (rm *RabbitMQBadConnectionMock) NotifyClose(c chan *amqp.Error) chan *amqp.Error

type RabbitMQChannelInterface added in v0.34.0

type RabbitMQChannelInterface interface {
	NotifyClose(chan *amqp.Error) chan *amqp.Error
	Qos(int, int, bool) error
	Confirm(bool) error
	Consume(string, string, bool, bool, bool, bool, amqp.Table) (<-chan amqp.Delivery, error)
	PublishWithDeferredConfirm(string, string, bool, bool, amqp.Publishing) (*amqp.DeferredConfirmation, error)
	QueueInspect(string) (amqp.Queue, error)
}

type RabbitMQChannelMock added in v0.34.0

type RabbitMQChannelMock struct {
	NotifyCloseChannel chan *amqp.Error
	ConsumeChannel     <-chan amqp.Delivery
}

func (*RabbitMQChannelMock) Confirm added in v0.34.0

func (rm *RabbitMQChannelMock) Confirm(a bool) error

func (*RabbitMQChannelMock) Consume added in v0.34.0

func (rm *RabbitMQChannelMock) Consume(a string, b string, c bool, d bool, e bool, f bool, t amqp.Table) (<-chan amqp.Delivery, error)

func (*RabbitMQChannelMock) NotifyClose added in v0.34.0

func (rm *RabbitMQChannelMock) NotifyClose(c chan *amqp.Error) chan *amqp.Error

func (*RabbitMQChannelMock) PublishWithDeferredConfirm added in v0.34.0

func (rm *RabbitMQChannelMock) PublishWithDeferredConfirm(a string, b string, c bool, d bool, p amqp.Publishing) (*amqp.DeferredConfirmation, error)

func (*RabbitMQChannelMock) Qos added in v0.34.0

func (rm *RabbitMQChannelMock) Qos(a int, b int, c bool) error

func (*RabbitMQChannelMock) QueueInspect added in v0.34.0

func (rm *RabbitMQChannelMock) QueueInspect(string) (amqp.Queue, error)

type RabbitMQConnection added in v0.34.0

type RabbitMQConnection struct {
	// contains filtered or unexported fields
}

func NewConnection added in v0.34.0

func NewConnection(conn interface{}) *RabbitMQConnection

func (*RabbitMQConnection) ChannelWrapper added in v0.34.0

func (r *RabbitMQConnection) ChannelWrapper() (RabbitMQChannelInterface, error)

func (*RabbitMQConnection) Close added in v0.34.0

func (r *RabbitMQConnection) Close() error

func (*RabbitMQConnection) IsClosed added in v0.34.0

func (r *RabbitMQConnection) IsClosed() bool

func (*RabbitMQConnection) NotifyClose added in v0.34.0

func (r *RabbitMQConnection) NotifyClose(c chan *amqp.Error) chan *amqp.Error

type RabbitMQConnectionInterface added in v0.34.0

type RabbitMQConnectionInterface interface {
	ChannelWrapper() (RabbitMQChannelInterface, error)
	NotifyClose(chan *amqp.Error) chan *amqp.Error
	Close() error
	IsClosed() bool
}

func DialWrapper added in v0.34.0

func DialWrapper(url string) (RabbitMQConnectionInterface, error)

func ValidConnectionDial added in v0.34.0

func ValidConnectionDial(url string) (RabbitMQConnectionInterface, error)

func ValidDial added in v0.34.0

func ValidDial(url string) (RabbitMQConnectionInterface, error)

type RabbitMQConnectionMock added in v0.34.0

type RabbitMQConnectionMock struct{}

func (*RabbitMQConnectionMock) ChannelWrapper added in v0.34.0

func (rm *RabbitMQConnectionMock) ChannelWrapper() (RabbitMQChannelInterface, error)

func (*RabbitMQConnectionMock) Close added in v0.34.0

func (rm *RabbitMQConnectionMock) Close() error

func (*RabbitMQConnectionMock) IsClosed added in v0.34.0

func (rm *RabbitMQConnectionMock) IsClosed() bool

func (*RabbitMQConnectionMock) NotifyClose added in v0.34.0

func (rm *RabbitMQConnectionMock) NotifyClose(c chan *amqp.Error) chan *amqp.Error

type RabbitMQHelper added in v0.31.0

type RabbitMQHelper struct {
	DialFunc func(string) (RabbitMQConnectionInterface, error)
	// contains filtered or unexported fields
}

func (*RabbitMQHelper) CleanupRabbitMQ added in v0.31.0

func (r *RabbitMQHelper) CleanupRabbitMQ(connection RabbitMQConnectionInterface, logger *zap.SugaredLogger)

func (*RabbitMQHelper) CloseRabbitMQConnections added in v0.31.0

func (r *RabbitMQHelper) CloseRabbitMQConnections(connection RabbitMQConnectionInterface, logger *zap.SugaredLogger)

func (*RabbitMQHelper) SetupRabbitMQ added in v0.31.0

func (*RabbitMQHelper) SignalRetry added in v0.31.0

func (r *RabbitMQHelper) SignalRetry(retry bool)

func (*RabbitMQHelper) WaitForRetrySignal added in v0.32.0

func (r *RabbitMQHelper) WaitForRetrySignal() bool

func (*RabbitMQHelper) WatchRabbitMQConnections added in v0.31.0

func (r *RabbitMQHelper) WatchRabbitMQConnections(
	connection RabbitMQConnectionInterface,
	channel RabbitMQChannelInterface,
	RabbitMQURL string,
	logger *zap.SugaredLogger)

type RabbitMQHelperInterface added in v0.34.0

type RabbitMQHelperInterface interface {
	SetupRabbitMQ(string, func(RabbitMQConnectionInterface, RabbitMQChannelInterface) error, *zap.SugaredLogger) (RabbitMQConnectionInterface, RabbitMQChannelInterface, error)
	WatchRabbitMQConnections(RabbitMQConnectionInterface, RabbitMQChannelInterface, string, *zap.SugaredLogger)
	SignalRetry(bool)
	WaitForRetrySignal() bool
	CloseRabbitMQConnections(RabbitMQConnectionInterface, *zap.SugaredLogger)
	CleanupRabbitMQ(connection RabbitMQConnectionInterface, logger *zap.SugaredLogger)
}

func NewRabbitMQHelper added in v0.31.0

func NewRabbitMQHelper(
	cycleDuration time.Duration,
	retryChannel chan bool,
	dialFunc func(string) (RabbitMQConnectionInterface, error)) RabbitMQHelperInterface

type Result

type Result struct {
	Name  string
	Ready bool
}

type Service

type Service interface {
	RabbitMQURL(context.Context, *rabbitv1beta1.RabbitmqClusterReference) (*url.URL, error)
	ReconcileExchange(context.Context, *ExchangeArgs) (Result, error)
	ReconcileQueue(context.Context, *QueueArgs) (Result, error)
	ReconcileBinding(context.Context, *BindingArgs) (Result, error)
	ReconcileBrokerDLXPolicy(context.Context, *QueueArgs) (Result, error)
	ReconcileDLQPolicy(context.Context, *QueueArgs) (Result, error)
	DeleteResource(ctx context.Context, args *DeleteResourceArgs) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL