rabbit

package
v0.32.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2022 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BindingKey    = "x-knative-trigger"
	DLQBindingKey = "x-knative-dlq"
)
View Source
const (
	TriggerLabelKey = "eventing.knative.dev/trigger"
	SourceLabelKey  = "eventing.knative.dev/SourceName"
)

Variables

This section is empty.

Functions

func ConvertMessageToHTTPRequest added in v0.32.0

func ConvertMessageToHTTPRequest(
	ctx context.Context,
	sourceName, namespace, queueName string,
	msg wabbit.Delivery,
	req *nethttp.Request,
	logger *zap.Logger) error

func ConvertToCloudEvent added in v0.32.0

func ConvertToCloudEvent(event *cloudevents.Event, msg wabbit.Delivery, namespace, sourceName, queueName string) error

func Labels added in v0.31.0

Labels generates the labels for a RabbitMQ resource Used by exchanges, queues, and bindings created by broker, trigger, and source controllers

func NewBinding added in v0.31.0

func NewBinding(args *BindingArgs) (*rabbitv1beta1.Binding, error)

func NewBrokerDLXPolicy added in v0.32.0

func NewBrokerDLXPolicy(args *QueueArgs) *rabbitv1beta1.Policy

NewBrokerDLXPolicy configures the broker dead letter exchange for trigger queues that does not have dlx defined

func NewExchange added in v0.31.0

func NewExchange(args *ExchangeArgs) *rabbitv1beta1.Exchange

NewExchange returns an `exchange.rabbitmq.com` object used by trigger, broker, and source reconcilers when used by trigger and broker, exchange properties such as `durable`, autoDelete`, and `type` are hardcoded

func NewPolicy added in v0.31.0

func NewPolicy(args *QueueArgs) *rabbitv1beta1.Policy

func NewQueue added in v0.31.0

func NewQueue(args *QueueArgs) *rabbitv1beta1.Queue

Types

type BindingArgs added in v0.31.0

type BindingArgs struct {
	Name                     string
	Namespace                string
	Owner                    metav1.OwnerReference
	RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference
	Vhost                    string
	Source                   string
	Destination              string
	Labels                   map[string]string
	Filters                  map[string]string
	ClusterName              string
}

type ExchangeArgs added in v0.31.0

type ExchangeArgs struct {
	Name                     string
	Namespace                string
	RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference
	RabbitMQURL              *url.URL
	Broker                   *eventingv1.Broker
	Trigger                  *eventingv1.Trigger
	Source                   *v1alpha1.RabbitmqSource
}

ExchangeArgs are the arguments to create a RabbitMQ Exchange.

type Message added in v0.32.0

type Message struct {
	Value       []byte
	Headers     map[string][]byte
	ContentType string
	// contains filtered or unexported fields
}

Message holds a rabbitmq message. this message *can* be read several times safely

func NewMessage added in v0.32.0

func NewMessage(value []byte, contentType string, headers map[string][]byte) *Message

NewMessage returns a binding.Message that holds the provided rabbitmq message components. The returned binding.Message *can* be read several times safely

func NewMessageFromDelivery added in v0.32.0

func NewMessageFromDelivery(sourceName, namespace, queueName string, msg wabbit.Delivery) *Message

NewMessageFromDelivery returns a binding.Message that holds the provided RabbitMQ Message. The returned binding.Message *can* be read several times safely

func (*Message) Finish added in v0.32.0

func (m *Message) Finish(error) error

func (*Message) GetAttribute added in v0.32.0

func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{})

func (*Message) GetExtension added in v0.32.0

func (m *Message) GetExtension(name string) interface{}

func (*Message) ReadBinary added in v0.32.0

func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) (err error)

func (*Message) ReadEncoding added in v0.32.0

func (m *Message) ReadEncoding() binding.Encoding

func (*Message) ReadStructured added in v0.32.0

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error

type QueueArgs added in v0.31.0

type QueueArgs struct {
	Name                     string
	Namespace                string
	QueueName                string
	RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference
	Owner                    metav1.OwnerReference
	Labels                   map[string]string
	DLXName                  *string
	Source                   *v1alpha1.RabbitmqSource
	BrokerUID                string
}

type Rabbit

type Rabbit struct {
	rabbitclientset.Interface
}

func New

func New(ctx context.Context) *Rabbit

func (*Rabbit) ReconcileBinding

func (r *Rabbit) ReconcileBinding(ctx context.Context, args *BindingArgs) (Result, error)

func (*Rabbit) ReconcileBrokerDLXPolicy added in v0.32.0

func (r *Rabbit) ReconcileBrokerDLXPolicy(ctx context.Context, args *QueueArgs) (Result, error)

func (*Rabbit) ReconcileExchange

func (r *Rabbit) ReconcileExchange(ctx context.Context, args *ExchangeArgs) (Result, error)

func (*Rabbit) ReconcileQueue

func (r *Rabbit) ReconcileQueue(ctx context.Context, args *QueueArgs) (Result, error)

type RabbitMQHelper added in v0.31.0

type RabbitMQHelper struct {
	// contains filtered or unexported fields
}

func NewRabbitMQHelper added in v0.31.0

func NewRabbitMQHelper(cycleDuration time.Duration, retryChannel chan bool) *RabbitMQHelper

func (*RabbitMQHelper) CleanupRabbitMQ added in v0.31.0

func (r *RabbitMQHelper) CleanupRabbitMQ(connection *amqp.Connection, logger *zap.SugaredLogger)

func (*RabbitMQHelper) CloseRabbitMQConnections added in v0.31.0

func (r *RabbitMQHelper) CloseRabbitMQConnections(connection *amqp.Connection, logger *zap.SugaredLogger)

func (*RabbitMQHelper) SetupRabbitMQ added in v0.31.0

func (r *RabbitMQHelper) SetupRabbitMQ(
	RabbitMQURL string,
	logger *zap.SugaredLogger) (*amqp.Connection, *amqp.Channel, error)

func (*RabbitMQHelper) SignalRetry added in v0.31.0

func (r *RabbitMQHelper) SignalRetry(retry bool)

func (*RabbitMQHelper) WaitForRetrySignal added in v0.32.0

func (r *RabbitMQHelper) WaitForRetrySignal() bool

func (*RabbitMQHelper) WatchRabbitMQConnections added in v0.31.0

func (r *RabbitMQHelper) WatchRabbitMQConnections(
	connection *amqp.Connection,
	channel *amqp.Channel,
	RabbitMQURL string,
	logger *zap.SugaredLogger)

type Result

type Result struct {
	Name  string
	Ready bool
}

type Service

type Service interface {
	ReconcileExchange(context.Context, *ExchangeArgs) (Result, error)
	ReconcileQueue(context.Context, *QueueArgs) (Result, error)
	ReconcileBinding(context.Context, *BindingArgs) (Result, error)
	ReconcileBrokerDLXPolicy(context.Context, *QueueArgs) (Result, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL