Documentation ¶
Index ¶
- Constants
- func ChannelConfirm(connection RabbitMQConnectionInterface, channel RabbitMQChannelInterface) error
- func ChannelQoS(connection RabbitMQConnectionInterface, channel RabbitMQChannelInterface) error
- func CloudEventToRabbitMQMessage(event *cloudevents.Event, tp, ts string) *amqp.Publishing
- func ConvertMessageToHTTPRequest(ctx context.Context, sourceName, namespace, queueName string, ...) error
- func ConvertToCloudEvent(event *cloudevents.Event, msg *amqp.Delivery, ...) error
- func Labels(b *eventingv1.Broker, t *eventingv1.Trigger, s *v1alpha1.RabbitmqSource) map[string]string
- func MakeSecret(name, typeString, namespace, url string, owner kmeta.OwnerRefable) *corev1.Secret
- func NewBinding(args *BindingArgs) (*rabbitv1beta1.Binding, error)
- func NewBrokerDLXPolicy(args *QueueArgs) *rabbitv1beta1.Policy
- func NewExchange(args *ExchangeArgs) *rabbitv1beta1.Exchange
- func NewPolicy(args *QueueArgs) *rabbitv1beta1.Policy
- func NewQueue(args *QueueArgs) *rabbitv1beta1.Queue
- func ReconcileSecret(ctx context.Context, secretLister corev1listers.SecretLister, ...) error
- func SecretLabels(resourceName, typeString string) map[string]string
- func SecretName(resourceName, typeString string) string
- func VHostHandler(broker string, vhost string) string
- type BindingArgs
- type DeleteResourceArgs
- type ExchangeArgs
- type Message
- func (m *Message) Finish(error) error
- func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{})
- func (m *Message) GetExtension(name string) interface{}
- func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) (err error)
- func (m *Message) ReadEncoding() binding.Encoding
- func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error
- type QueueArgs
- type Rabbit
- func (r *Rabbit) DeleteResource(ctx context.Context, args *DeleteResourceArgs) error
- func (r *Rabbit) GetRabbitMQCASecret(ctx context.Context, clusterRef *rabbitv1beta1.RabbitmqClusterReference) (string, error)
- func (r *Rabbit) RabbitMQURL(ctx context.Context, clusterRef *rabbitv1beta1.RabbitmqClusterReference) (*url.URL, error)
- func (r *Rabbit) ReconcileBinding(ctx context.Context, args *BindingArgs) (Result, error)
- func (r *Rabbit) ReconcileBrokerDLXPolicy(ctx context.Context, args *QueueArgs) (Result, error)
- func (r *Rabbit) ReconcileDLQPolicy(ctx context.Context, args *QueueArgs) (Result, error)
- func (r *Rabbit) ReconcileExchange(ctx context.Context, args *ExchangeArgs) (Result, error)
- func (r *Rabbit) ReconcileQueue(ctx context.Context, args *QueueArgs) (Result, error)
- type RabbitMQBadConnectionMock
- type RabbitMQChannelInterface
- type RabbitMQChannelMock
- func (rm *RabbitMQChannelMock) Confirm(a bool) error
- func (rm *RabbitMQChannelMock) Consume(a string, b string, c bool, d bool, e bool, f bool, t amqp.Table) (<-chan amqp.Delivery, error)
- func (rm *RabbitMQChannelMock) IsClosed() bool
- func (rm *RabbitMQChannelMock) NotifyClose(c chan *amqp.Error) chan *amqp.Error
- func (rm *RabbitMQChannelMock) PublishWithDeferredConfirm(a string, b string, c bool, d bool, p amqp.Publishing) (*amqp.DeferredConfirmation, error)
- func (rm *RabbitMQChannelMock) Qos(a int, b int, c bool) error
- func (rm *RabbitMQChannelMock) QueueInspect(string) (amqp.Queue, error)
- type RabbitMQConnection
- type RabbitMQConnectionHandler
- type RabbitMQConnectionInterface
- type RabbitMQConnectionMock
- type RabbitMQConnectionWrapperInterface
- type RabbitMQConnectionsHandlerInterface
- type Result
- type Service
Constants ¶
const ( BindingKey = "x-knative-trigger" DLQBindingKey = "x-knative-dlq" )
const ( TriggerLabelKey = "eventing.knative.dev/trigger" SourceLabelKey = "eventing.knative.dev/SourceName" )
const (
BrokerURLSecretKey = "brokerURL"
)
const CA_SECRET_KEYNAME = "caSecretName"
Variables ¶
This section is empty.
Functions ¶
func ChannelConfirm ¶ added in v0.34.0
func ChannelConfirm(connection RabbitMQConnectionInterface, channel RabbitMQChannelInterface) error
func ChannelQoS ¶ added in v0.34.0
func ChannelQoS(connection RabbitMQConnectionInterface, channel RabbitMQChannelInterface) error
func CloudEventToRabbitMQMessage ¶ added in v0.35.0
func CloudEventToRabbitMQMessage(event *cloudevents.Event, tp, ts string) *amqp.Publishing
func ConvertMessageToHTTPRequest ¶ added in v0.32.0
func ConvertToCloudEvent ¶ added in v0.32.0
func Labels ¶ added in v0.31.0
func Labels(b *eventingv1.Broker, t *eventingv1.Trigger, s *v1alpha1.RabbitmqSource) map[string]string
Labels generates the labels for a RabbitMQ resource Used by exchanges, queues, and bindings created by broker, trigger, and source controllers
func MakeSecret ¶ added in v0.33.0
func MakeSecret(name, typeString, namespace, url string, owner kmeta.OwnerRefable) *corev1.Secret
MakeSecret creates the secret for Broker deployments for Rabbit Broker.
func NewBinding ¶ added in v0.31.0
func NewBinding(args *BindingArgs) (*rabbitv1beta1.Binding, error)
func NewBrokerDLXPolicy ¶ added in v0.32.0
func NewBrokerDLXPolicy(args *QueueArgs) *rabbitv1beta1.Policy
NewBrokerDLXPolicy configures the broker dead letter exchange for trigger queues that does not have dlx defined
func NewExchange ¶ added in v0.31.0
func NewExchange(args *ExchangeArgs) *rabbitv1beta1.Exchange
NewExchange returns an `exchange.rabbitmq.com` object used by trigger, broker, and source reconcilers when used by trigger and broker, exchange properties such as `durable`, autoDelete`, and `type` are hardcoded
func NewPolicy ¶ added in v0.31.0
func NewPolicy(args *QueueArgs) *rabbitv1beta1.Policy
func NewQueue ¶ added in v0.31.0
func NewQueue(args *QueueArgs) *rabbitv1beta1.Queue
func ReconcileSecret ¶ added in v0.33.0
func ReconcileSecret(ctx context.Context, secretLister corev1listers.SecretLister, kubeClientSet kubernetes.Interface, s *corev1.Secret) error
reconcileSecret reconciles the K8s Secret 's'.
func SecretLabels ¶ added in v0.33.0
SecretLabels generates the labels present on all resources representing the secret of the given Broker.
func SecretName ¶ added in v0.33.0
func VHostHandler ¶ added in v0.35.0
Types ¶
type BindingArgs ¶ added in v0.31.0
type BindingArgs struct { Name string Namespace string Owner metav1.OwnerReference RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference RabbitMQVhost string Source string Destination string Labels map[string]string Filters map[string]string ClusterName string }
type DeleteResourceArgs ¶ added in v0.34.0
type ExchangeArgs ¶ added in v0.31.0
type ExchangeArgs struct { Name string Namespace string RabbitMQVhost string RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference RabbitMQURL *url.URL Broker *eventingv1.Broker Trigger *eventingv1.Trigger Source *v1alpha1.RabbitmqSource }
ExchangeArgs are the arguments to create a RabbitMQ Exchange.
type Message ¶ added in v0.32.0
type Message struct { Value []byte Headers map[string][]byte ContentType string // contains filtered or unexported fields }
Message holds a rabbitmq message. this message *can* be read several times safely
func NewMessage ¶ added in v0.32.0
NewMessage returns a binding.Message that holds the provided rabbitmq message components. The returned binding.Message *can* be read several times safely
func NewMessageFromDelivery ¶ added in v0.32.0
NewMessageFromDelivery returns a binding.Message that holds the provided RabbitMQ Message. The returned binding.Message *can* be read several times safely
func (*Message) GetAttribute ¶ added in v0.32.0
func (*Message) GetExtension ¶ added in v0.32.0
func (*Message) ReadBinary ¶ added in v0.32.0
func (*Message) ReadEncoding ¶ added in v0.32.0
func (*Message) ReadStructured ¶ added in v0.32.0
type QueueArgs ¶ added in v0.31.0
type QueueArgs struct { Name string Namespace string RabbitMQVhost string QueueName string RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference Owner metav1.OwnerReference Labels map[string]string DLXName *string Source *v1alpha1.RabbitmqSource BrokerUID string QueueType eventingv1alpha1.QueueType }
type Rabbit ¶
type Rabbit struct { rabbitclientset.Interface // contains filtered or unexported fields }
func (*Rabbit) DeleteResource ¶ added in v0.34.0
func (r *Rabbit) DeleteResource(ctx context.Context, args *DeleteResourceArgs) error
func (*Rabbit) GetRabbitMQCASecret ¶ added in v0.35.0
func (r *Rabbit) GetRabbitMQCASecret(ctx context.Context, clusterRef *rabbitv1beta1.RabbitmqClusterReference) (string, error)
func (*Rabbit) RabbitMQURL ¶ added in v0.33.0
func (r *Rabbit) RabbitMQURL(ctx context.Context, clusterRef *rabbitv1beta1.RabbitmqClusterReference) (*url.URL, error)
func (*Rabbit) ReconcileBinding ¶
func (*Rabbit) ReconcileBrokerDLXPolicy ¶ added in v0.32.0
func (*Rabbit) ReconcileDLQPolicy ¶ added in v0.33.0
func (*Rabbit) ReconcileExchange ¶
type RabbitMQBadConnectionMock ¶ added in v0.34.0
type RabbitMQBadConnectionMock struct{}
func (*RabbitMQBadConnectionMock) ChannelWrapper ¶ added in v0.34.0
func (rm *RabbitMQBadConnectionMock) ChannelWrapper() (RabbitMQChannelInterface, error)
func (*RabbitMQBadConnectionMock) Close ¶ added in v0.34.0
func (rm *RabbitMQBadConnectionMock) Close() error
func (*RabbitMQBadConnectionMock) IsClosed ¶ added in v0.34.0
func (rm *RabbitMQBadConnectionMock) IsClosed() bool
type RabbitMQChannelInterface ¶ added in v0.34.0
type RabbitMQChannelInterface interface { IsClosed() bool NotifyClose(chan *amqp091.Error) chan *amqp091.Error Qos(int, int, bool) error Confirm(bool) error Consume(string, string, bool, bool, bool, bool, amqp091.Table) (<-chan amqp091.Delivery, error) PublishWithDeferredConfirm(string, string, bool, bool, amqp091.Publishing) (*amqp091.DeferredConfirmation, error) QueueInspect(string) (amqp091.Queue, error) }
type RabbitMQChannelMock ¶ added in v0.34.0
type RabbitMQChannelMock struct { NotifyCloseChannel chan *amqp.Error ConsumeChannel <-chan amqp.Delivery }
func (*RabbitMQChannelMock) Confirm ¶ added in v0.34.0
func (rm *RabbitMQChannelMock) Confirm(a bool) error
func (*RabbitMQChannelMock) IsClosed ¶ added in v0.36.0
func (rm *RabbitMQChannelMock) IsClosed() bool
func (*RabbitMQChannelMock) NotifyClose ¶ added in v0.34.0
func (rm *RabbitMQChannelMock) NotifyClose(c chan *amqp.Error) chan *amqp.Error
func (*RabbitMQChannelMock) PublishWithDeferredConfirm ¶ added in v0.34.0
func (rm *RabbitMQChannelMock) PublishWithDeferredConfirm(a string, b string, c bool, d bool, p amqp.Publishing) (*amqp.DeferredConfirmation, error)
func (*RabbitMQChannelMock) Qos ¶ added in v0.34.0
func (rm *RabbitMQChannelMock) Qos(a int, b int, c bool) error
func (*RabbitMQChannelMock) QueueInspect ¶ added in v0.34.0
func (rm *RabbitMQChannelMock) QueueInspect(string) (amqp.Queue, error)
type RabbitMQConnection ¶ added in v0.34.0
type RabbitMQConnection struct {
// contains filtered or unexported fields
}
func NewConnection ¶ added in v0.34.0
func NewConnection(conn interface{}) *RabbitMQConnection
func (*RabbitMQConnection) ChannelWrapper ¶ added in v0.34.0
func (r *RabbitMQConnection) ChannelWrapper() (RabbitMQChannelInterface, error)
func (*RabbitMQConnection) Close ¶ added in v0.34.0
func (r *RabbitMQConnection) Close() error
func (*RabbitMQConnection) IsClosed ¶ added in v0.34.0
func (r *RabbitMQConnection) IsClosed() bool
func (*RabbitMQConnection) NotifyClose ¶ added in v0.34.0
func (r *RabbitMQConnection) NotifyClose(c chan *amqp091.Error) chan *amqp091.Error
type RabbitMQConnectionHandler ¶ added in v0.36.0
type RabbitMQConnectionHandler struct { Connection RabbitMQConnectionWrapperInterface Channel RabbitMQChannelInterface // contains filtered or unexported fields }
func (*RabbitMQConnectionHandler) GetChannel ¶ added in v0.36.0
func (r *RabbitMQConnectionHandler) GetChannel() RabbitMQChannelInterface
func (*RabbitMQConnectionHandler) GetConnection ¶ added in v0.36.0
func (r *RabbitMQConnectionHandler) GetConnection() RabbitMQConnectionInterface
func (*RabbitMQConnectionHandler) Setup ¶ added in v0.36.0
func (r *RabbitMQConnectionHandler) Setup( ctx context.Context, rabbitMQURL string, configFunction func(RabbitMQConnectionInterface, RabbitMQChannelInterface) error, dialFunc func(string) (RabbitMQConnectionWrapperInterface, error))
type RabbitMQConnectionInterface ¶ added in v0.34.0
type RabbitMQConnectionMock ¶ added in v0.34.0
func (*RabbitMQConnectionMock) ChannelWrapper ¶ added in v0.34.0
func (rm *RabbitMQConnectionMock) ChannelWrapper() (RabbitMQChannelInterface, error)
func (*RabbitMQConnectionMock) Close ¶ added in v0.34.0
func (rm *RabbitMQConnectionMock) Close() error
func (*RabbitMQConnectionMock) IsClosed ¶ added in v0.34.0
func (rm *RabbitMQConnectionMock) IsClosed() bool
func (*RabbitMQConnectionMock) NotifyClose ¶ added in v0.34.0
func (rm *RabbitMQConnectionMock) NotifyClose(c chan *amqp.Error) chan *amqp.Error
type RabbitMQConnectionWrapperInterface ¶ added in v0.36.0
type RabbitMQConnectionWrapperInterface interface { RabbitMQConnectionInterface ChannelWrapper() (RabbitMQChannelInterface, error) }
func BadChannelDial ¶ added in v0.36.0
func BadChannelDial(url string) (RabbitMQConnectionWrapperInterface, error)
func BadConnectionDial ¶ added in v0.36.0
func BadConnectionDial(url string) (RabbitMQConnectionWrapperInterface, error)
func DialWrapper ¶ added in v0.34.0
func DialWrapper(url string) (RabbitMQConnectionWrapperInterface, error)
func ValidDial ¶ added in v0.34.0
func ValidDial(url string) (RabbitMQConnectionWrapperInterface, error)
type RabbitMQConnectionsHandlerInterface ¶ added in v0.36.0
type RabbitMQConnectionsHandlerInterface interface { GetConnection() RabbitMQConnectionInterface GetChannel() RabbitMQChannelInterface Setup(context.Context, string, func(RabbitMQConnectionInterface, RabbitMQChannelInterface) error, func(string) (RabbitMQConnectionWrapperInterface, error)) }
func NewRabbitMQConnectionHandler ¶ added in v0.36.0
func NewRabbitMQConnectionHandler(cycleDuration time.Duration, logger *zap.SugaredLogger) RabbitMQConnectionsHandlerInterface
type Service ¶
type Service interface { RabbitMQURL(context.Context, *rabbitv1beta1.RabbitmqClusterReference) (*url.URL, error) ReconcileExchange(context.Context, *ExchangeArgs) (Result, error) ReconcileQueue(context.Context, *QueueArgs) (Result, error) ReconcileBinding(context.Context, *BindingArgs) (Result, error) ReconcileBrokerDLXPolicy(context.Context, *QueueArgs) (Result, error) ReconcileDLQPolicy(context.Context, *QueueArgs) (Result, error) DeleteResource(ctx context.Context, args *DeleteResourceArgs) error GetRabbitMQCASecret(ctx context.Context, clusterRef *rabbitv1beta1.RabbitmqClusterReference) (string, error) }