Documentation ¶
Index ¶
- Constants
- func ConvertMessageToHTTPRequest(ctx context.Context, sourceName, namespace, queueName string, ...) error
- func ConvertToCloudEvent(event *cloudevents.Event, msg wabbit.Delivery, ...) error
- func Labels(b *eventingv1.Broker, t *eventingv1.Trigger, s *v1alpha1.RabbitmqSource) map[string]string
- func NewBinding(args *BindingArgs) (*rabbitv1beta1.Binding, error)
- func NewBrokerDLXPolicy(args *QueueArgs) *rabbitv1beta1.Policy
- func NewExchange(args *ExchangeArgs) *rabbitv1beta1.Exchange
- func NewPolicy(args *QueueArgs) *rabbitv1beta1.Policy
- func NewQueue(args *QueueArgs) *rabbitv1beta1.Queue
- type BindingArgs
- type ExchangeArgs
- type Message
- func (m *Message) Finish(error) error
- func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{})
- func (m *Message) GetExtension(name string) interface{}
- func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) (err error)
- func (m *Message) ReadEncoding() binding.Encoding
- func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error
- type QueueArgs
- type Rabbit
- func (r *Rabbit) ReconcileBinding(ctx context.Context, args *BindingArgs) (Result, error)
- func (r *Rabbit) ReconcileBrokerDLXPolicy(ctx context.Context, args *QueueArgs) (Result, error)
- func (r *Rabbit) ReconcileExchange(ctx context.Context, args *ExchangeArgs) (Result, error)
- func (r *Rabbit) ReconcileQueue(ctx context.Context, args *QueueArgs) (Result, error)
- type RabbitMQHelper
- func (r *RabbitMQHelper) CleanupRabbitMQ(connection *amqp.Connection, logger *zap.SugaredLogger)
- func (r *RabbitMQHelper) CloseRabbitMQConnections(connection *amqp.Connection, logger *zap.SugaredLogger)
- func (r *RabbitMQHelper) SetupRabbitMQ(RabbitMQURL string, logger *zap.SugaredLogger) (*amqp.Connection, *amqp.Channel, error)
- func (r *RabbitMQHelper) SignalRetry(retry bool)
- func (r *RabbitMQHelper) WaitForRetrySignal() bool
- func (r *RabbitMQHelper) WatchRabbitMQConnections(connection *amqp.Connection, channel *amqp.Channel, RabbitMQURL string, ...)
- type Result
- type Service
Constants ¶
const ( BindingKey = "x-knative-trigger" DLQBindingKey = "x-knative-dlq" )
const ( TriggerLabelKey = "eventing.knative.dev/trigger" SourceLabelKey = "eventing.knative.dev/SourceName" )
Variables ¶
This section is empty.
Functions ¶
func ConvertMessageToHTTPRequest ¶ added in v0.32.0
func ConvertToCloudEvent ¶ added in v0.32.0
func Labels ¶ added in v0.31.0
func Labels(b *eventingv1.Broker, t *eventingv1.Trigger, s *v1alpha1.RabbitmqSource) map[string]string
Labels generates the labels for a RabbitMQ resource Used by exchanges, queues, and bindings created by broker, trigger, and source controllers
func NewBinding ¶ added in v0.31.0
func NewBinding(args *BindingArgs) (*rabbitv1beta1.Binding, error)
func NewBrokerDLXPolicy ¶ added in v0.32.0
func NewBrokerDLXPolicy(args *QueueArgs) *rabbitv1beta1.Policy
NewBrokerDLXPolicy configures the broker dead letter exchange for trigger queues that does not have dlx defined
func NewExchange ¶ added in v0.31.0
func NewExchange(args *ExchangeArgs) *rabbitv1beta1.Exchange
NewExchange returns an `exchange.rabbitmq.com` object used by trigger, broker, and source reconcilers when used by trigger and broker, exchange properties such as `durable`, autoDelete`, and `type` are hardcoded
func NewPolicy ¶ added in v0.31.0
func NewPolicy(args *QueueArgs) *rabbitv1beta1.Policy
func NewQueue ¶ added in v0.31.0
func NewQueue(args *QueueArgs) *rabbitv1beta1.Queue
Types ¶
type BindingArgs ¶ added in v0.31.0
type BindingArgs struct { Name string Namespace string Owner metav1.OwnerReference RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference Vhost string Source string Destination string Labels map[string]string Filters map[string]string ClusterName string }
type ExchangeArgs ¶ added in v0.31.0
type ExchangeArgs struct { Name string Namespace string RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference RabbitMQURL *url.URL Broker *eventingv1.Broker Trigger *eventingv1.Trigger Source *v1alpha1.RabbitmqSource }
ExchangeArgs are the arguments to create a RabbitMQ Exchange.
type Message ¶ added in v0.32.0
type Message struct { Value []byte Headers map[string][]byte ContentType string // contains filtered or unexported fields }
Message holds a rabbitmq message. this message *can* be read several times safely
func NewMessage ¶ added in v0.32.0
NewMessage returns a binding.Message that holds the provided rabbitmq message components. The returned binding.Message *can* be read several times safely
func NewMessageFromDelivery ¶ added in v0.32.0
NewMessageFromDelivery returns a binding.Message that holds the provided RabbitMQ Message. The returned binding.Message *can* be read several times safely
func (*Message) GetAttribute ¶ added in v0.32.0
func (*Message) GetExtension ¶ added in v0.32.0
func (*Message) ReadBinary ¶ added in v0.32.0
func (*Message) ReadEncoding ¶ added in v0.32.0
func (*Message) ReadStructured ¶ added in v0.32.0
type QueueArgs ¶ added in v0.31.0
type QueueArgs struct { Name string Namespace string QueueName string RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference Owner metav1.OwnerReference Labels map[string]string DLXName *string Source *v1alpha1.RabbitmqSource BrokerUID string }
type Rabbit ¶
type Rabbit struct {
rabbitclientset.Interface
}
func (*Rabbit) ReconcileBinding ¶
func (*Rabbit) ReconcileBrokerDLXPolicy ¶ added in v0.32.0
func (*Rabbit) ReconcileExchange ¶
type RabbitMQHelper ¶ added in v0.31.0
type RabbitMQHelper struct {
// contains filtered or unexported fields
}
func NewRabbitMQHelper ¶ added in v0.31.0
func NewRabbitMQHelper(cycleDuration time.Duration, retryChannel chan bool) *RabbitMQHelper
func (*RabbitMQHelper) CleanupRabbitMQ ¶ added in v0.31.0
func (r *RabbitMQHelper) CleanupRabbitMQ(connection *amqp.Connection, logger *zap.SugaredLogger)
func (*RabbitMQHelper) CloseRabbitMQConnections ¶ added in v0.31.0
func (r *RabbitMQHelper) CloseRabbitMQConnections(connection *amqp.Connection, logger *zap.SugaredLogger)
func (*RabbitMQHelper) SetupRabbitMQ ¶ added in v0.31.0
func (r *RabbitMQHelper) SetupRabbitMQ( RabbitMQURL string, logger *zap.SugaredLogger) (*amqp.Connection, *amqp.Channel, error)
func (*RabbitMQHelper) SignalRetry ¶ added in v0.31.0
func (r *RabbitMQHelper) SignalRetry(retry bool)
func (*RabbitMQHelper) WaitForRetrySignal ¶ added in v0.32.0
func (r *RabbitMQHelper) WaitForRetrySignal() bool
func (*RabbitMQHelper) WatchRabbitMQConnections ¶ added in v0.31.0
func (r *RabbitMQHelper) WatchRabbitMQConnections( connection *amqp.Connection, channel *amqp.Channel, RabbitMQURL string, logger *zap.SugaredLogger)