Documentation ¶
Index ¶
- func MakeFactory(ctor Ctor) Factory
- func NewKafkaChannel(name string, namespace string, options ...KafkaChannelOption) *v1beta1.KafkaChannel
- func WithInitKafkaChannelConditions(kafkachannel *v1beta1.KafkaChannel)
- func WithKafkaChannelReady(kafkachannel *v1beta1.KafkaChannel)
- type Ctor
- type KafkaChannelOption
- type Listers
- func (l *Listers) GetAllObjects() []runtime.Object
- func (l *Listers) GetDeploymentLister() appsv1listers.DeploymentLister
- func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister
- func (l *Listers) GetKafkaChannelLister() messaginglisters.KafkaChannelLister
- func (l *Listers) GetKubeObjects() []runtime.Object
- func (l *Listers) GetMessagingObjects() []runtime.Object
- func (l *Listers) GetServiceLister() corev1listers.ServiceLister
- type MockConsumerGroupClaim
- type MockConsumerGroupSession
- func (m MockConsumerGroupSession) Claims() map[string][]int32
- func (m MockConsumerGroupSession) Commit()
- func (m MockConsumerGroupSession) Context() context.Context
- func (m MockConsumerGroupSession) GenerationID() int32
- func (m MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)
- func (m MockConsumerGroupSession) MarkMessageCalled() bool
- func (m MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)
- func (m MockConsumerGroupSession) MemberID() string
- func (m MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)
- type MockMessageDispatcher
- func (m *MockMessageDispatcher) DispatchMessage(ctx context.Context, message cloudevents.Message, ...) (*channel.DispatchExecutionInfo, error)
- func (m *MockMessageDispatcher) DispatchMessageWithRetries(ctx context.Context, message cloudevents.Message, headers http.Header, ...) (*channel.DispatchExecutionInfo, error)
- func (m *MockMessageDispatcher) Message() cloudevents.Message
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MakeFactory ¶
func MakeFactory(ctor Ctor) Factory
MakeFactory creates a reconciler factory with fake clients and controller created by `ctor`.
func NewKafkaChannel ¶
func NewKafkaChannel(name string, namespace string, options ...KafkaChannelOption) *v1beta1.KafkaChannel
NewKafkaChannel creates an KafkaChannel with KafkaChannelOptions.
func WithInitKafkaChannelConditions ¶
func WithInitKafkaChannelConditions(kafkachannel *v1beta1.KafkaChannel)
func WithKafkaChannelReady ¶
func WithKafkaChannelReady(kafkachannel *v1beta1.KafkaChannel)
Types ¶
type Ctor ¶
type Ctor func( listers *Listers, kafkaClient versioned.Interface, eventRecorder record.EventRecorder, status consumer.SubscriberStatusMap, ) controller.Reconciler
Ctor functions create a k8s controller with given params.
type KafkaChannelOption ¶
type KafkaChannelOption func(*v1beta1.KafkaChannel)
KafkaChannelOption enables further configuration of a KafkaChannel.
func WithKafkaChannelAddress ¶
func WithKafkaChannelAddress(a string) KafkaChannelOption
func WithSubscriber ¶
func WithSubscriber(uid types.UID, uri string) KafkaChannelOption
func WithSubscriberNotReady ¶ added in v0.25.0
func WithSubscriberNotReady(uid types.UID, message string) KafkaChannelOption
func WithSubscriberReady ¶
func WithSubscriberReady(uid types.UID) KafkaChannelOption
type Listers ¶
type Listers struct {
// contains filtered or unexported fields
}
func NewListers ¶
func (*Listers) GetAllObjects ¶
func (*Listers) GetDeploymentLister ¶
func (l *Listers) GetDeploymentLister() appsv1listers.DeploymentLister
func (*Listers) GetEndpointsLister ¶
func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister
func (*Listers) GetKafkaChannelLister ¶
func (l *Listers) GetKafkaChannelLister() messaginglisters.KafkaChannelLister
func (*Listers) GetKubeObjects ¶
func (*Listers) GetMessagingObjects ¶
func (*Listers) GetServiceLister ¶
func (l *Listers) GetServiceLister() corev1listers.ServiceLister
type MockConsumerGroupClaim ¶
type MockConsumerGroupClaim struct { MessageChan chan *sarama.ConsumerMessage // contains filtered or unexported fields }
Define The Mock ConsumerGroupSession
func NewMockConsumerGroupClaim ¶
func NewMockConsumerGroupClaim(t *testing.T) MockConsumerGroupClaim
Mock ConsumerGroupClaim Constructor
func (MockConsumerGroupClaim) HighWaterMarkOffset ¶
func (m MockConsumerGroupClaim) HighWaterMarkOffset() int64
func (MockConsumerGroupClaim) InitialOffset ¶
func (m MockConsumerGroupClaim) InitialOffset() int64
func (MockConsumerGroupClaim) Messages ¶
func (m MockConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage
func (MockConsumerGroupClaim) Partition ¶
func (m MockConsumerGroupClaim) Partition() int32
func (MockConsumerGroupClaim) Topic ¶
func (m MockConsumerGroupClaim) Topic() string
type MockConsumerGroupSession ¶
type MockConsumerGroupSession struct { MarkMessageChan chan *sarama.ConsumerMessage // contains filtered or unexported fields }
Define The Mock ConsumerGroupSession
func NewMockConsumerGroupSession ¶
func NewMockConsumerGroupSession(t *testing.T) MockConsumerGroupSession
Mock ConsumerGroupSession Constructor
func (MockConsumerGroupSession) Claims ¶
func (m MockConsumerGroupSession) Claims() map[string][]int32
func (MockConsumerGroupSession) Commit ¶
func (m MockConsumerGroupSession) Commit()
func (MockConsumerGroupSession) Context ¶
func (m MockConsumerGroupSession) Context() context.Context
func (MockConsumerGroupSession) GenerationID ¶
func (m MockConsumerGroupSession) GenerationID() int32
func (MockConsumerGroupSession) MarkMessage ¶
func (m MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string)
func (MockConsumerGroupSession) MarkMessageCalled ¶ added in v0.22.0
func (m MockConsumerGroupSession) MarkMessageCalled() bool
func (MockConsumerGroupSession) MarkOffset ¶
func (m MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string)
func (MockConsumerGroupSession) MemberID ¶
func (m MockConsumerGroupSession) MemberID() string
func (MockConsumerGroupSession) ResetOffset ¶
func (m MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string)
type MockMessageDispatcher ¶
type MockMessageDispatcher struct {
// contains filtered or unexported fields
}
Define The Mock MessageDispatcher
func NewMockMessageDispatcher ¶
func NewMockMessageDispatcher(t *testing.T, headers http.Header, destinationUrl *url.URL, replyUrl *url.URL, deadLetterUrl *url.URL, retryConfig *kncloudevents.RetryConfig, response error) *MockMessageDispatcher
Mock MessageDispatcher Constructor
func (*MockMessageDispatcher) DispatchMessage ¶
func (*MockMessageDispatcher) DispatchMessageWithRetries ¶
func (m *MockMessageDispatcher) DispatchMessageWithRetries(ctx context.Context, message cloudevents.Message, headers http.Header, destinationUrl *url.URL, replyUrl *url.URL, deadLetterUrl *url.URL, retryConfig *kncloudevents.RetryConfig, transformers ...binding.Transformer) (*channel.DispatchExecutionInfo, error)
func (*MockMessageDispatcher) Message ¶
func (m *MockMessageDispatcher) Message() cloudevents.Message
Click to show internal directories.
Click to hide internal directories.