Documentation ¶
Index ¶
- Constants
- func ChannelConfirm(connection RabbitMQConnectionInterface, channel RabbitMQChannelInterface) error
- func ChannelQoS(connection RabbitMQConnectionInterface, channel RabbitMQChannelInterface) error
- func CloudEventToRabbitMQMessage(event *cloudevents.Event, tp, ts string) *amqp.Publishing
- func ConvertMessageToHTTPRequest(ctx context.Context, sourceName, namespace, queueName string, ...) error
- func ConvertToCloudEvent(event *cloudevents.Event, msg *amqp.Delivery, ...) error
- func Labels(b *eventingv1.Broker, t *eventingv1.Trigger, s *v1alpha1.RabbitmqSource) map[string]string
- func MakeSecret(name, typeString, namespace, url string, owner kmeta.OwnerRefable) *corev1.Secret
- func NewBinding(args *BindingArgs) (*rabbitv1beta1.Binding, error)
- func NewBrokerDLXPolicy(args *QueueArgs) *rabbitv1beta1.Policy
- func NewExchange(args *ExchangeArgs) *rabbitv1beta1.Exchange
- func NewPolicy(args *QueueArgs) *rabbitv1beta1.Policy
- func NewQueue(args *QueueArgs) *rabbitv1beta1.Queue
- func ReconcileSecret(ctx context.Context, secretLister corev1listers.SecretLister, ...) error
- func SecretLabels(resourceName, typeString string) map[string]string
- func SecretName(resourceName, typeString string) string
- func VHostHandler(broker string, vhost string) string
- func Watcher(testChannel chan bool, rabbitmqHelper RabbitMQHelper)
- type BindingArgs
- type DeleteResourceArgs
- type ExchangeArgs
- type Message
- func (m *Message) Finish(error) error
- func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{})
- func (m *Message) GetExtension(name string) interface{}
- func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) (err error)
- func (m *Message) ReadEncoding() binding.Encoding
- func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error
- type QueueArgs
- type Rabbit
- func (r *Rabbit) DeleteResource(ctx context.Context, args *DeleteResourceArgs) error
- func (r *Rabbit) GetRabbitMQCASecret(ctx context.Context, clusterRef *rabbitv1beta1.RabbitmqClusterReference) (string, error)
- func (r *Rabbit) RabbitMQURL(ctx context.Context, clusterRef *rabbitv1beta1.RabbitmqClusterReference) (*url.URL, error)
- func (r *Rabbit) ReconcileBinding(ctx context.Context, args *BindingArgs) (Result, error)
- func (r *Rabbit) ReconcileBrokerDLXPolicy(ctx context.Context, args *QueueArgs) (Result, error)
- func (r *Rabbit) ReconcileDLQPolicy(ctx context.Context, args *QueueArgs) (Result, error)
- func (r *Rabbit) ReconcileExchange(ctx context.Context, args *ExchangeArgs) (Result, error)
- func (r *Rabbit) ReconcileQueue(ctx context.Context, args *QueueArgs) (Result, error)
- type RabbitMQBadConnectionMock
- type RabbitMQChannelInterface
- type RabbitMQChannelMock
- func (rm *RabbitMQChannelMock) Confirm(a bool) error
- func (rm *RabbitMQChannelMock) Consume(a string, b string, c bool, d bool, e bool, f bool, t amqp.Table) (<-chan amqp.Delivery, error)
- func (rm *RabbitMQChannelMock) NotifyClose(c chan *amqp.Error) chan *amqp.Error
- func (rm *RabbitMQChannelMock) PublishWithDeferredConfirm(a string, b string, c bool, d bool, p amqp.Publishing) (*amqp.DeferredConfirmation, error)
- func (rm *RabbitMQChannelMock) Qos(a int, b int, c bool) error
- func (rm *RabbitMQChannelMock) QueueInspect(string) (amqp.Queue, error)
- type RabbitMQConnection
- type RabbitMQConnectionInterface
- type RabbitMQConnectionMock
- type RabbitMQHelper
- func (r *RabbitMQHelper) CleanupRabbitMQ(connection RabbitMQConnectionInterface, logger *zap.SugaredLogger)
- func (r *RabbitMQHelper) CloseRabbitMQConnections(connection RabbitMQConnectionInterface, logger *zap.SugaredLogger)
- func (r *RabbitMQHelper) SetupRabbitMQ(RabbitMQURL string, ...) (RabbitMQConnectionInterface, RabbitMQChannelInterface, error)
- func (r *RabbitMQHelper) SignalRetry(retry bool)
- func (r *RabbitMQHelper) WaitForRetrySignal() bool
- func (r *RabbitMQHelper) WatchRabbitMQConnections(connection RabbitMQConnectionInterface, channel RabbitMQChannelInterface, ...)
- type RabbitMQHelperInterface
- type Result
- type Service
Constants ¶
const ( BindingKey = "x-knative-trigger" DLQBindingKey = "x-knative-dlq" )
const ( TriggerLabelKey = "eventing.knative.dev/trigger" SourceLabelKey = "eventing.knative.dev/SourceName" )
const (
BrokerURLSecretKey = "brokerURL"
)
const CA_SECRET_KEYNAME = "caSecretName"
Variables ¶
This section is empty.
Functions ¶
func ChannelConfirm ¶ added in v0.34.0
func ChannelConfirm(connection RabbitMQConnectionInterface, channel RabbitMQChannelInterface) error
func ChannelQoS ¶ added in v0.34.0
func ChannelQoS(connection RabbitMQConnectionInterface, channel RabbitMQChannelInterface) error
func CloudEventToRabbitMQMessage ¶ added in v0.35.0
func CloudEventToRabbitMQMessage(event *cloudevents.Event, tp, ts string) *amqp.Publishing
func ConvertMessageToHTTPRequest ¶ added in v0.32.0
func ConvertToCloudEvent ¶ added in v0.32.0
func Labels ¶ added in v0.31.0
func Labels(b *eventingv1.Broker, t *eventingv1.Trigger, s *v1alpha1.RabbitmqSource) map[string]string
Labels generates the labels for a RabbitMQ resource Used by exchanges, queues, and bindings created by broker, trigger, and source controllers
func MakeSecret ¶ added in v0.33.0
func MakeSecret(name, typeString, namespace, url string, owner kmeta.OwnerRefable) *corev1.Secret
MakeSecret creates the secret for Broker deployments for Rabbit Broker.
func NewBinding ¶ added in v0.31.0
func NewBinding(args *BindingArgs) (*rabbitv1beta1.Binding, error)
func NewBrokerDLXPolicy ¶ added in v0.32.0
func NewBrokerDLXPolicy(args *QueueArgs) *rabbitv1beta1.Policy
NewBrokerDLXPolicy configures the broker dead letter exchange for trigger queues that does not have dlx defined
func NewExchange ¶ added in v0.31.0
func NewExchange(args *ExchangeArgs) *rabbitv1beta1.Exchange
NewExchange returns an `exchange.rabbitmq.com` object used by trigger, broker, and source reconcilers when used by trigger and broker, exchange properties such as `durable`, autoDelete`, and `type` are hardcoded
func NewPolicy ¶ added in v0.31.0
func NewPolicy(args *QueueArgs) *rabbitv1beta1.Policy
func NewQueue ¶ added in v0.31.0
func NewQueue(args *QueueArgs) *rabbitv1beta1.Queue
func ReconcileSecret ¶ added in v0.33.0
func ReconcileSecret(ctx context.Context, secretLister corev1listers.SecretLister, kubeClientSet kubernetes.Interface, s *corev1.Secret) error
reconcileSecret reconciles the K8s Secret 's'.
func SecretLabels ¶ added in v0.33.0
SecretLabels generates the labels present on all resources representing the secret of the given Broker.
func SecretName ¶ added in v0.33.0
func VHostHandler ¶ added in v0.35.0
func Watcher ¶ added in v0.34.0
func Watcher(testChannel chan bool, rabbitmqHelper RabbitMQHelper)
Types ¶
type BindingArgs ¶ added in v0.31.0
type BindingArgs struct { Name string Namespace string Owner metav1.OwnerReference RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference Vhost string Source string Destination string Labels map[string]string Filters map[string]string ClusterName string }
type DeleteResourceArgs ¶ added in v0.34.0
type ExchangeArgs ¶ added in v0.31.0
type ExchangeArgs struct { Name string Namespace string RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference RabbitMQURL *url.URL Broker *eventingv1.Broker Trigger *eventingv1.Trigger Source *v1alpha1.RabbitmqSource }
ExchangeArgs are the arguments to create a RabbitMQ Exchange.
type Message ¶ added in v0.32.0
type Message struct { Value []byte Headers map[string][]byte ContentType string // contains filtered or unexported fields }
Message holds a rabbitmq message. this message *can* be read several times safely
func NewMessage ¶ added in v0.32.0
NewMessage returns a binding.Message that holds the provided rabbitmq message components. The returned binding.Message *can* be read several times safely
func NewMessageFromDelivery ¶ added in v0.32.0
NewMessageFromDelivery returns a binding.Message that holds the provided RabbitMQ Message. The returned binding.Message *can* be read several times safely
func (*Message) GetAttribute ¶ added in v0.32.0
func (*Message) GetExtension ¶ added in v0.32.0
func (*Message) ReadBinary ¶ added in v0.32.0
func (*Message) ReadEncoding ¶ added in v0.32.0
func (*Message) ReadStructured ¶ added in v0.32.0
type QueueArgs ¶ added in v0.31.0
type QueueArgs struct { Name string Namespace string QueueName string RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference Owner metav1.OwnerReference Labels map[string]string DLXName *string Source *v1alpha1.RabbitmqSource BrokerUID string QueueType eventingv1alpha1.QueueType }
type Rabbit ¶
type Rabbit struct { rabbitclientset.Interface // contains filtered or unexported fields }
func (*Rabbit) DeleteResource ¶ added in v0.34.0
func (r *Rabbit) DeleteResource(ctx context.Context, args *DeleteResourceArgs) error
func (*Rabbit) GetRabbitMQCASecret ¶ added in v0.35.0
func (r *Rabbit) GetRabbitMQCASecret(ctx context.Context, clusterRef *rabbitv1beta1.RabbitmqClusterReference) (string, error)
func (*Rabbit) RabbitMQURL ¶ added in v0.33.0
func (r *Rabbit) RabbitMQURL(ctx context.Context, clusterRef *rabbitv1beta1.RabbitmqClusterReference) (*url.URL, error)
func (*Rabbit) ReconcileBinding ¶
func (*Rabbit) ReconcileBrokerDLXPolicy ¶ added in v0.32.0
func (*Rabbit) ReconcileDLQPolicy ¶ added in v0.33.0
func (*Rabbit) ReconcileExchange ¶
type RabbitMQBadConnectionMock ¶ added in v0.34.0
type RabbitMQBadConnectionMock struct{}
func (*RabbitMQBadConnectionMock) ChannelWrapper ¶ added in v0.34.0
func (rm *RabbitMQBadConnectionMock) ChannelWrapper() (RabbitMQChannelInterface, error)
func (*RabbitMQBadConnectionMock) Close ¶ added in v0.34.0
func (rm *RabbitMQBadConnectionMock) Close() error
func (*RabbitMQBadConnectionMock) IsClosed ¶ added in v0.34.0
func (rm *RabbitMQBadConnectionMock) IsClosed() bool
func (*RabbitMQBadConnectionMock) NotifyClose ¶ added in v0.34.0
func (rm *RabbitMQBadConnectionMock) NotifyClose(c chan *amqp.Error) chan *amqp.Error
type RabbitMQChannelInterface ¶ added in v0.34.0
type RabbitMQChannelInterface interface { NotifyClose(chan *amqp.Error) chan *amqp.Error Qos(int, int, bool) error Confirm(bool) error Consume(string, string, bool, bool, bool, bool, amqp.Table) (<-chan amqp.Delivery, error) PublishWithDeferredConfirm(string, string, bool, bool, amqp.Publishing) (*amqp.DeferredConfirmation, error) QueueInspect(string) (amqp.Queue, error) }
type RabbitMQChannelMock ¶ added in v0.34.0
type RabbitMQChannelMock struct { NotifyCloseChannel chan *amqp.Error ConsumeChannel <-chan amqp.Delivery }
func (*RabbitMQChannelMock) Confirm ¶ added in v0.34.0
func (rm *RabbitMQChannelMock) Confirm(a bool) error
func (*RabbitMQChannelMock) NotifyClose ¶ added in v0.34.0
func (rm *RabbitMQChannelMock) NotifyClose(c chan *amqp.Error) chan *amqp.Error
func (*RabbitMQChannelMock) PublishWithDeferredConfirm ¶ added in v0.34.0
func (rm *RabbitMQChannelMock) PublishWithDeferredConfirm(a string, b string, c bool, d bool, p amqp.Publishing) (*amqp.DeferredConfirmation, error)
func (*RabbitMQChannelMock) Qos ¶ added in v0.34.0
func (rm *RabbitMQChannelMock) Qos(a int, b int, c bool) error
func (*RabbitMQChannelMock) QueueInspect ¶ added in v0.34.0
func (rm *RabbitMQChannelMock) QueueInspect(string) (amqp.Queue, error)
type RabbitMQConnection ¶ added in v0.34.0
type RabbitMQConnection struct {
// contains filtered or unexported fields
}
func NewConnection ¶ added in v0.34.0
func NewConnection(conn interface{}) *RabbitMQConnection
func (*RabbitMQConnection) ChannelWrapper ¶ added in v0.34.0
func (r *RabbitMQConnection) ChannelWrapper() (RabbitMQChannelInterface, error)
func (*RabbitMQConnection) Close ¶ added in v0.34.0
func (r *RabbitMQConnection) Close() error
func (*RabbitMQConnection) IsClosed ¶ added in v0.34.0
func (r *RabbitMQConnection) IsClosed() bool
func (*RabbitMQConnection) NotifyClose ¶ added in v0.34.0
func (r *RabbitMQConnection) NotifyClose(c chan *amqp.Error) chan *amqp.Error
type RabbitMQConnectionInterface ¶ added in v0.34.0
type RabbitMQConnectionInterface interface { ChannelWrapper() (RabbitMQChannelInterface, error) NotifyClose(chan *amqp.Error) chan *amqp.Error Close() error IsClosed() bool }
func DialWrapper ¶ added in v0.34.0
func DialWrapper(url string) (RabbitMQConnectionInterface, error)
func ValidConnectionDial ¶ added in v0.34.0
func ValidConnectionDial(url string) (RabbitMQConnectionInterface, error)
func ValidDial ¶ added in v0.34.0
func ValidDial(url string) (RabbitMQConnectionInterface, error)
type RabbitMQConnectionMock ¶ added in v0.34.0
type RabbitMQConnectionMock struct{}
func (*RabbitMQConnectionMock) ChannelWrapper ¶ added in v0.34.0
func (rm *RabbitMQConnectionMock) ChannelWrapper() (RabbitMQChannelInterface, error)
func (*RabbitMQConnectionMock) Close ¶ added in v0.34.0
func (rm *RabbitMQConnectionMock) Close() error
func (*RabbitMQConnectionMock) IsClosed ¶ added in v0.34.0
func (rm *RabbitMQConnectionMock) IsClosed() bool
func (*RabbitMQConnectionMock) NotifyClose ¶ added in v0.34.0
func (rm *RabbitMQConnectionMock) NotifyClose(c chan *amqp.Error) chan *amqp.Error
type RabbitMQHelper ¶ added in v0.31.0
type RabbitMQHelper struct { DialFunc func(string) (RabbitMQConnectionInterface, error) // contains filtered or unexported fields }
func (*RabbitMQHelper) CleanupRabbitMQ ¶ added in v0.31.0
func (r *RabbitMQHelper) CleanupRabbitMQ(connection RabbitMQConnectionInterface, logger *zap.SugaredLogger)
func (*RabbitMQHelper) CloseRabbitMQConnections ¶ added in v0.31.0
func (r *RabbitMQHelper) CloseRabbitMQConnections(connection RabbitMQConnectionInterface, logger *zap.SugaredLogger)
func (*RabbitMQHelper) SetupRabbitMQ ¶ added in v0.31.0
func (r *RabbitMQHelper) SetupRabbitMQ( RabbitMQURL string, configFunction func(RabbitMQConnectionInterface, RabbitMQChannelInterface) error, logger *zap.SugaredLogger) (RabbitMQConnectionInterface, RabbitMQChannelInterface, error)
func (*RabbitMQHelper) SignalRetry ¶ added in v0.31.0
func (r *RabbitMQHelper) SignalRetry(retry bool)
func (*RabbitMQHelper) WaitForRetrySignal ¶ added in v0.32.0
func (r *RabbitMQHelper) WaitForRetrySignal() bool
func (*RabbitMQHelper) WatchRabbitMQConnections ¶ added in v0.31.0
func (r *RabbitMQHelper) WatchRabbitMQConnections( connection RabbitMQConnectionInterface, channel RabbitMQChannelInterface, logger *zap.SugaredLogger)
type RabbitMQHelperInterface ¶ added in v0.34.0
type RabbitMQHelperInterface interface { SetupRabbitMQ(string, func(RabbitMQConnectionInterface, RabbitMQChannelInterface) error, *zap.SugaredLogger) (RabbitMQConnectionInterface, RabbitMQChannelInterface, error) WatchRabbitMQConnections(RabbitMQConnectionInterface, RabbitMQChannelInterface, *zap.SugaredLogger) SignalRetry(bool) WaitForRetrySignal() bool CloseRabbitMQConnections(RabbitMQConnectionInterface, *zap.SugaredLogger) CleanupRabbitMQ(connection RabbitMQConnectionInterface, logger *zap.SugaredLogger) }
func NewRabbitMQHelper ¶ added in v0.31.0
func NewRabbitMQHelper( cycleDuration time.Duration, retryChannel chan bool, dialFunc func(string) (RabbitMQConnectionInterface, error)) RabbitMQHelperInterface
type Service ¶
type Service interface { RabbitmqV1beta1() rmqv1beta1.RabbitmqV1beta1Interface RabbitMQURL(context.Context, *rabbitv1beta1.RabbitmqClusterReference) (*url.URL, error) ReconcileExchange(context.Context, *ExchangeArgs) (Result, error) ReconcileQueue(context.Context, *QueueArgs) (Result, error) ReconcileBinding(context.Context, *BindingArgs) (Result, error) ReconcileBrokerDLXPolicy(context.Context, *QueueArgs) (Result, error) ReconcileDLQPolicy(context.Context, *QueueArgs) (Result, error) DeleteResource(ctx context.Context, args *DeleteResourceArgs) error GetRabbitMQCASecret(ctx context.Context, clusterRef *rabbitv1beta1.RabbitmqClusterReference) (string, error) }