Documentation ¶
Index ¶
- Constants
- Variables
- func MakeFactory(ctor Ctor, logger *zap.Logger) Factory
- func NewChannelOwnerRef() metav1.OwnerReference
- func NewConfig(options ...KafkaConfigOption) *config.EventingKafkaConfig
- func NewDeploymentDeleteActionImpl(deployment *appsv1.Deployment) clientgotesting.DeleteActionImpl
- func NewDeploymentUpdateActionImpl(deployment *appsv1.Deployment) clientgotesting.UpdateActionImpl
- func NewEnvironment() *env.Environment
- func NewFinalizerPatchActionImpl() clientgotesting.PatchActionImpl
- func NewKafkaChannel(options ...KafkaChannelOption) *kafkav1beta1.KafkaChannel
- func NewKafkaChannelDispatcherDeployment(options ...DeploymentOption) *appsv1.Deployment
- func NewKafkaChannelDispatcherService(options ...ServiceOption) *corev1.Service
- func NewKafkaChannelFailedFinalizationEvent() string
- func NewKafkaChannelFailedReconciliationEvent() string
- func NewKafkaChannelFinalizerUpdateEvent() string
- func NewKafkaChannelLabelUpdate(kafkachannel *kafkav1beta1.KafkaChannel) clientgotesting.UpdateActionImpl
- func NewKafkaChannelReceiverDeployment(options ...DeploymentOption) *appsv1.Deployment
- func NewKafkaChannelReceiverService(options ...ServiceOption) *corev1.Service
- func NewKafkaChannelService(options ...ServiceOption) *corev1.Service
- func NewKafkaChannelSuccessfulFinalizedEvent() string
- func NewKafkaChannelSuccessfulReconciliationEvent() string
- func NewKafkaSecret(options ...KafkaSecretOption) *corev1.Secret
- func NewKafkaSecretFailedReconciliationEvent() string
- func NewKafkaSecretFinalizerPatchActionImpl() clientgotesting.PatchActionImpl
- func NewKafkaSecretFinalizerUpdateEvent() string
- func NewKafkaSecretSuccessfulFinalizedEvent() string
- func NewKafkaSecretSuccessfulReconciliationEvent() string
- func NewSecretOwnerRef() metav1.OwnerReference
- func NewServiceDeleteActionImpl(service *corev1.Service) clientgotesting.DeleteActionImpl
- func NewServiceUpdateActionImpl(service *corev1.Service) clientgotesting.UpdateActionImpl
- func WithAddress(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithAnnotations(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithDeletionTimestamp(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithDeletionTimestampDeployment(deployment *appsv1.Deployment)
- func WithDeletionTimestampService(service *corev1.Service)
- func WithDispatcherDeploymentReady(_ *kafkav1beta1.KafkaChannel)
- func WithDispatcherFailed(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithFinalizer(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithInitializedConditions(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithKafkaChannelServiceFailed(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithKafkaChannelServiceReady(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithKafkaSecretDeleted(secret *corev1.Secret)
- func WithKafkaSecretFinalizer(secret *corev1.Secret)
- func WithLabels(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithMetaData(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithNoDispatcherResources(kafkaConfig *config.EventingKafkaConfig)
- func WithNoReceiverResources(kafkaConfig *config.EventingKafkaConfig)
- func WithReceiverDeploymentFailed(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithReceiverDeploymentFinalized(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithReceiverDeploymentReady(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithReceiverServiceFailed(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithReceiverServiceFinalized(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithReceiverServiceReady(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithTopicReady(kafkachannel *kafkav1beta1.KafkaChannel)
- func WithoutFinalizersDeployment(deployment *appsv1.Deployment)
- func WithoutFinalizersService(service *corev1.Service)
- func WithoutResources(deployment *appsv1.Deployment)
- type Ctor
- type DeploymentOption
- type KafkaChannelOption
- type KafkaConfigOption
- type KafkaSecretOption
- type Listers
- func (l *Listers) GetAllObjects() []runtime.Object
- func (l *Listers) GetDeploymentLister() appsv1listers.DeploymentLister
- func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister
- func (l *Listers) GetEventingObjects() []runtime.Object
- func (l *Listers) GetEventsObjects() []runtime.Object
- func (l *Listers) GetKafkaChannelLister() kafkalisters.KafkaChannelLister
- func (l *Listers) GetKafkaChannelObjects() []runtime.Object
- func (l *Listers) GetKubeObjects() []runtime.Object
- func (l *Listers) GetSecretLister() corev1listers.SecretLister
- func (l *Listers) GetServiceLister() corev1listers.ServiceLister
- type MockAdminClient
- func (m *MockAdminClient) Close() error
- func (m *MockAdminClient) CloseCalled() bool
- func (m *MockAdminClient) CreateTopic(ctx context.Context, topicName string, topicDetail *sarama.TopicDetail) *sarama.TopicError
- func (m *MockAdminClient) CreateTopicsCalled() bool
- func (m *MockAdminClient) DeleteTopic(ctx context.Context, topicName string) *sarama.TopicError
- func (m *MockAdminClient) DeleteTopicsCalled() bool
- func (m *MockAdminClient) GetKafkaSecretName(_ string) string
- type ServiceOption
Constants ¶
const ( // Prometheus MetricsPort MetricsPortName = "metrics" // Environment Test Data ServiceAccount = "TestServiceAccount" KafkaAdminType = "kafka" MetricsPort = 9876 MetricsDomain = "eventing-kafka" HealthPort = 8082 ResyncPeriod = 3600 * time.Minute ReceiverImage = "TestReceiverImage" ReceiverReplicas = 1 DispatcherImage = "TestDispatcherImage" DispatcherReplicas = 1 DefaultNumPartitions = 4 DefaultReplicationFactor = 1 DefaultRetentionMillis = 99999 // Channel Test Data KafkaChannelNamespace = "kafkachannel-namespace" KafkaChannelName = "kafkachannel-name" KafkaChannelKey = KafkaChannelNamespace + "/" + KafkaChannelName KafkaSecretNamespace = "eventing-test-ns" // Needs To Match system.Namespace() Call In Reconciliation KafkaSecretName = "kafkasecret-name" KafkaSecretKey = KafkaSecretNamespace + "/" + KafkaSecretName ReceiverDeploymentName = KafkaSecretName + "-b9176d5f-receiver" // Truncated MD5 Hash Of KafkaSecretName ReceiverServiceName = ReceiverDeploymentName TopicName = KafkaChannelNamespace + "." + KafkaChannelName KafkaSecretDataValueBrokers = "TestKafkaSecretDataBrokers" KafkaSecretDataValueUsername = "TestKafkaSecretDataUsername" KafkaSecretDataValuePassword = "TestKafkaSecretDataPassword" KafkaSecretDataValueSaslType = "PLAIN" // ChannelSpec Test Data NumPartitions = 123 ReplicationFactor = 456 // Test MetaData ErrorString = "Expected Mock Test Error" SuccessString = "Expected Mock Test Success" // Test Dispatcher Resources DispatcherMemoryRequest = "20Mi" DispatcherCpuRequest = "100m" DispatcherMemoryLimit = "50Mi" DispatcherCpuLimit = "300m" // Test Receiver Resources ReceiverMemoryRequest = "10Mi" ReceiverMemoryLimit = "20Mi" ReceiverCpuRequest = "10m" ReceiverCpuLimit = "100m" ControllerConfigYaml = `` /* 463-byte string literal not displayed */ SaramaConfigYaml = `` /* 444-byte string literal not displayed */ )
Constants
Variables ¶
var ( DefaultRetentionMillisString = strconv.FormatInt(DefaultRetentionMillis, 10) DeletionTimestamp = metav1.Now() )
Functions ¶
func MakeFactory ¶
MakeFactory creates a reconciler factory with fake clients and controller created by `ctor`.
func NewChannelOwnerRef ¶
func NewChannelOwnerRef() metav1.OwnerReference
Utility Function For Creating A New OwnerReference Model For The Test Channel
func NewConfig ¶
func NewConfig(options ...KafkaConfigOption) *config.EventingKafkaConfig
Set The Required Config Fields
func NewDeploymentDeleteActionImpl ¶
func NewDeploymentDeleteActionImpl(deployment *appsv1.Deployment) clientgotesting.DeleteActionImpl
Utility Function For Creating A DeleteActionImpl For A Deployment Delete Command
func NewDeploymentUpdateActionImpl ¶
func NewDeploymentUpdateActionImpl(deployment *appsv1.Deployment) clientgotesting.UpdateActionImpl
Utility Function For Creating A UpdateActionImpl For A Deployment Update Command
func NewFinalizerPatchActionImpl ¶
func NewFinalizerPatchActionImpl() clientgotesting.PatchActionImpl
Utility Function For Creating A PatchActionImpl For The Finalizer Patch Command
func NewKafkaChannel ¶
func NewKafkaChannel(options ...KafkaChannelOption) *kafkav1beta1.KafkaChannel
Utility Function For Creating A Custom KafkaChannel For Testing
func NewKafkaChannelDispatcherDeployment ¶
func NewKafkaChannelDispatcherDeployment(options ...DeploymentOption) *appsv1.Deployment
Utility Function For Creating A Custom KafkaChannel Dispatcher Deployment For Testing
func NewKafkaChannelDispatcherService ¶
func NewKafkaChannelDispatcherService(options ...ServiceOption) *corev1.Service
Utility Function For Creating A Custom KafkaChannel Dispatcher Service For Testing
func NewKafkaChannelFailedFinalizationEvent ¶
func NewKafkaChannelFailedFinalizationEvent() string
Utility Function For Creating A Failed KafkaChannel Reconciled Event
func NewKafkaChannelFailedReconciliationEvent ¶
func NewKafkaChannelFailedReconciliationEvent() string
Utility Function For Creating A Failed KafkaChannel Reconciled Event
func NewKafkaChannelFinalizerUpdateEvent ¶
func NewKafkaChannelFinalizerUpdateEvent() string
Utility Function For Creating A Successful KafkaChannel Finalizer Update Event
func NewKafkaChannelLabelUpdate ¶
func NewKafkaChannelLabelUpdate(kafkachannel *kafkav1beta1.KafkaChannel) clientgotesting.UpdateActionImpl
Utility Function For Creating A UpdateActionImpl For The KafkaChannel Labels Update Command
func NewKafkaChannelReceiverDeployment ¶
func NewKafkaChannelReceiverDeployment(options ...DeploymentOption) *appsv1.Deployment
Utility Function For Creating A Receiver Deployment For The Test Channel
func NewKafkaChannelReceiverService ¶
func NewKafkaChannelReceiverService(options ...ServiceOption) *corev1.Service
Utility Function For Creating A Custom Receiver Service For Testing
func NewKafkaChannelService ¶
func NewKafkaChannelService(options ...ServiceOption) *corev1.Service
Utility Function For Creating A Custom KafkaChannel "Channel" Service For Testing
func NewKafkaChannelSuccessfulFinalizedEvent ¶
func NewKafkaChannelSuccessfulFinalizedEvent() string
Utility Function For Creating A Successful KafkaChannel Finalizer Update Event
func NewKafkaChannelSuccessfulReconciliationEvent ¶
func NewKafkaChannelSuccessfulReconciliationEvent() string
Utility Function For Creating A Successful KafkaChannel Reconciled Event
func NewKafkaSecret ¶
func NewKafkaSecret(options ...KafkaSecretOption) *corev1.Secret
Create A New Kafka Auth Secret For Testing
func NewKafkaSecretFailedReconciliationEvent ¶
func NewKafkaSecretFailedReconciliationEvent() string
Utility Function For Creating A Failed Kafka secret Reconciled Event
func NewKafkaSecretFinalizerPatchActionImpl ¶
func NewKafkaSecretFinalizerPatchActionImpl() clientgotesting.PatchActionImpl
Utility Function For Creating A PatchActionImpl For The Finalizer Patch Command
func NewKafkaSecretFinalizerUpdateEvent ¶
func NewKafkaSecretFinalizerUpdateEvent() string
Utility Function For Creating A Successful Kafka Secret Finalizer Update Event
func NewKafkaSecretSuccessfulFinalizedEvent ¶
func NewKafkaSecretSuccessfulFinalizedEvent() string
Utility Function For Creating A Successful Kafka Secret Finalizer Update Event
func NewKafkaSecretSuccessfulReconciliationEvent ¶
func NewKafkaSecretSuccessfulReconciliationEvent() string
Utility Function For Creating A Successful Kafka Secret Reconciled Event
func NewSecretOwnerRef ¶
func NewSecretOwnerRef() metav1.OwnerReference
Utility Function For Creating A New OwnerReference Model For The Test Kafka Secret
func NewServiceDeleteActionImpl ¶
func NewServiceDeleteActionImpl(service *corev1.Service) clientgotesting.DeleteActionImpl
Utility Function For Creating A DeleteActionImpl For A Service Delete Command
func NewServiceUpdateActionImpl ¶
func NewServiceUpdateActionImpl(service *corev1.Service) clientgotesting.UpdateActionImpl
Utility Function For Creating A UpdateActionImpl For A Service Update Command
func WithAddress ¶
func WithAddress(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Address
func WithAnnotations ¶
func WithAnnotations(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Annotations
func WithDeletionTimestamp ¶
func WithDeletionTimestamp(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's DeletionTimestamp To Current Time
func WithDeletionTimestampDeployment ¶
func WithDeletionTimestampDeployment(deployment *appsv1.Deployment)
Set The Deployment's DeletionTimestamp To Current Time
func WithDeletionTimestampService ¶
Set The Service's DeletionTimestamp To Current Time
func WithDispatcherDeploymentReady ¶
func WithDispatcherDeploymentReady(_ *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Dispatcher Deployment As READY
func WithDispatcherFailed ¶
func WithDispatcherFailed(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Dispatcher Deployment As Failed
func WithFinalizer ¶
func WithFinalizer(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Finalizer
func WithInitializedConditions ¶
func WithInitializedConditions(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Status To Initialized State
func WithKafkaChannelServiceFailed ¶
func WithKafkaChannelServiceFailed(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Services As Failed
func WithKafkaChannelServiceReady ¶
func WithKafkaChannelServiceReady(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Service As READY
func WithKafkaSecretDeleted ¶
Set The Kafka Secret's DeletionTimestamp To Current Time
func WithKafkaSecretFinalizer ¶
Set The Kafka Secret's Finalizer
func WithLabels ¶
func WithLabels(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Labels
func WithMetaData ¶
func WithMetaData(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's MetaData
func WithNoDispatcherResources ¶ added in v0.20.0
func WithNoDispatcherResources(kafkaConfig *config.EventingKafkaConfig)
Remove The Dispatcher Resource Requests And Limits
func WithNoReceiverResources ¶ added in v0.20.0
func WithNoReceiverResources(kafkaConfig *config.EventingKafkaConfig)
Remove The Receiver Resource Requests And Limits
func WithReceiverDeploymentFailed ¶
func WithReceiverDeploymentFailed(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Receiver Deployment As Failed
func WithReceiverDeploymentFinalized ¶
func WithReceiverDeploymentFinalized(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Receiver Deployment As Finalized
func WithReceiverDeploymentReady ¶
func WithReceiverDeploymentReady(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Receiver Deployment As READY
func WithReceiverServiceFailed ¶
func WithReceiverServiceFailed(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Receiver Service As Failed
func WithReceiverServiceFinalized ¶
func WithReceiverServiceFinalized(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Receiver Service As Finalized
func WithReceiverServiceReady ¶
func WithReceiverServiceReady(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Receiver Service As READY
func WithTopicReady ¶
func WithTopicReady(kafkachannel *kafkav1beta1.KafkaChannel)
Set The KafkaChannel's Topic READY
func WithoutFinalizersDeployment ¶
func WithoutFinalizersDeployment(deployment *appsv1.Deployment)
Clear The Dispatcher Deployment's Finalizers
func WithoutFinalizersService ¶
Clear The Specified Service's Finalizers
func WithoutResources ¶ added in v0.20.0
func WithoutResources(deployment *appsv1.Deployment)
Types ¶
type Ctor ¶
type Ctor func(context.Context, *Listers, configmap.Watcher, []KafkaConfigOption) controller.Reconciler
Ctor functions create a k8s controller with given params.
type DeploymentOption ¶
type DeploymentOption func(service *appsv1.Deployment)
type KafkaChannelOption ¶
type KafkaChannelOption func(*kafkav1beta1.KafkaChannel)
KafkaChannelOption Enables Customization Of A KafkaChannel
type KafkaConfigOption ¶ added in v0.20.0
type KafkaConfigOption func(kafkaConfig *config.EventingKafkaConfig)
KafkaConfigOption Enables Customization Of An EventingKafkaConfig
type KafkaSecretOption ¶
KafkaSecretOption Enables Customization Of A KafkaChannel
type Listers ¶
type Listers struct {
// contains filtered or unexported fields
}
func NewListers ¶
func (*Listers) GetAllObjects ¶
func (*Listers) GetDeploymentLister ¶
func (l *Listers) GetDeploymentLister() appsv1listers.DeploymentLister
func (*Listers) GetEndpointsLister ¶
func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister
func (*Listers) GetEventingObjects ¶
func (*Listers) GetEventsObjects ¶
func (*Listers) GetKafkaChannelLister ¶
func (l *Listers) GetKafkaChannelLister() kafkalisters.KafkaChannelLister
func (*Listers) GetKafkaChannelObjects ¶
func (*Listers) GetKubeObjects ¶
func (*Listers) GetSecretLister ¶
func (l *Listers) GetSecretLister() corev1listers.SecretLister
func (*Listers) GetServiceLister ¶
func (l *Listers) GetServiceLister() corev1listers.ServiceLister
type MockAdminClient ¶
type MockAdminClient struct { MockCreateTopicFunc func(context.Context, string, *sarama.TopicDetail) *sarama.TopicError MockDeleteTopicFunc func(context.Context, string) *sarama.TopicError MockCloseFunc func() error // contains filtered or unexported fields }
Mock Kafka AdminClient Implementation
func (*MockAdminClient) Close ¶
func (m *MockAdminClient) Close() error
Mock Kafka AdminClient Close Function - NoOp
func (*MockAdminClient) CloseCalled ¶
func (m *MockAdminClient) CloseCalled() bool
Check On Calls To Close()
func (*MockAdminClient) CreateTopic ¶
func (m *MockAdminClient) CreateTopic(ctx context.Context, topicName string, topicDetail *sarama.TopicDetail) *sarama.TopicError
Mock Kafka AdminClient CreateTopic() Function - Calls Custom CreateTopic() If Specified, Otherwise Returns Success
func (*MockAdminClient) CreateTopicsCalled ¶
func (m *MockAdminClient) CreateTopicsCalled() bool
Check On Calls To CreateTopics()
func (*MockAdminClient) DeleteTopic ¶
func (m *MockAdminClient) DeleteTopic(ctx context.Context, topicName string) *sarama.TopicError
Mock Kafka AdminClient DeleteTopic() Function - Calls Custom DeleteTopic() If Specified, Otherwise Returns Success
func (*MockAdminClient) DeleteTopicsCalled ¶
func (m *MockAdminClient) DeleteTopicsCalled() bool
Check On Calls To DeleteTopics()
func (*MockAdminClient) GetKafkaSecretName ¶
func (m *MockAdminClient) GetKafkaSecretName(_ string) string
Mock Kafka Secret Name Function - Return Test Data
type ServiceOption ¶
Service / Deployment Options For Customizing Test Data