Documentation
¶
Index ¶
- Constants
- func MakeFactory(ctor Ctor, logger *zap.Logger) Factory
- func NewChannelOwnerRef() metav1.OwnerReference
- func NewEnvironment() *env.Environment
- func NewFinalizerPatchActionImpl() clientgotesting.PatchActionImpl
- func NewKafkaChannel(options ...KafkaChannelOption) *kafkav1alpha1.KafkaChannel
- func NewKafkaChannelChannelDeployment() *appsv1.Deployment
- func NewKafkaChannelChannelService() *corev1.Service
- func NewKafkaChannelDispatcherDeployment() *appsv1.Deployment
- func NewKafkaChannelDispatcherService() *corev1.Service
- func NewKafkaChannelFailedReconciliationEvent() string
- func NewKafkaChannelFinalizerUpdateEvent() string
- func NewKafkaChannelLabelUpdate(kafkachannel *kafkav1alpha1.KafkaChannel) clientgotesting.UpdateActionImpl
- func NewKafkaChannelService() *corev1.Service
- func NewKafkaChannelSuccessfulFinalizedEvent() string
- func NewKafkaChannelSuccessfulReconciliationEvent() string
- func NewKafkaSecret(options ...KafkaSecretOption) *corev1.Secret
- func NewKafkaSecretFailedReconciliationEvent() string
- func NewKafkaSecretFinalizerPatchActionImpl() clientgotesting.PatchActionImpl
- func NewKafkaSecretFinalizerUpdateEvent() string
- func NewKafkaSecretSuccessfulFinalizedEvent() string
- func NewKafkaSecretSuccessfulReconciliationEvent() string
- func NewSecretOwnerRef() metav1.OwnerReference
- func WithAddress(kafkachannel *kafkav1alpha1.KafkaChannel)
- func WithChannelDeploymentFailed(kafkachannel *kafkav1alpha1.KafkaChannel)
- func WithChannelDeploymentFinalized(kafkachannel *kafkav1alpha1.KafkaChannel)
- func WithChannelDeploymentReady(kafkachannel *kafkav1alpha1.KafkaChannel)
- func WithChannelServiceFailed(kafkachannel *kafkav1alpha1.KafkaChannel)
- func WithChannelServiceFinalized(kafkachannel *kafkav1alpha1.KafkaChannel)
- func WithChannelServiceReady(kafkachannel *kafkav1alpha1.KafkaChannel)
- func WithDeletionTimestamp(kafkachannel *kafkav1alpha1.KafkaChannel)
- func WithDispatcherDeploymentReady(kafkachannel *kafkav1alpha1.KafkaChannel)
- func WithDispatcherFailed(kafkachannel *kafkav1alpha1.KafkaChannel)
- func WithFinalizer(kafkachannel *kafkav1alpha1.KafkaChannel)
- func WithInitializedConditions(kafkachannel *kafkav1alpha1.KafkaChannel)
- func WithKafkaChannelServiceFailed(kafkachannel *kafkav1alpha1.KafkaChannel)
- func WithKafkaChannelServiceReady(kafkachannel *kafkav1alpha1.KafkaChannel)
- func WithKafkaSecretDeleted(secret *corev1.Secret)
- func WithKafkaSecretFinalizer(secret *corev1.Secret)
- func WithLabels(kafkachannel *kafkav1alpha1.KafkaChannel)
- func WithTopicReady(kafkachannel *kafkav1alpha1.KafkaChannel)
- type Ctor
- type KafkaChannelOption
- type KafkaSecretOption
- type Listers
- func (l *Listers) GetAllObjects() []runtime.Object
- func (l *Listers) GetDeploymentLister() appsv1listers.DeploymentLister
- func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister
- func (l *Listers) GetEventingObjects() []runtime.Object
- func (l *Listers) GetEventsObjects() []runtime.Object
- func (l *Listers) GetKafkaChannelLister() kafkalisters.KafkaChannelLister
- func (l *Listers) GetKafkaChannelObjects() []runtime.Object
- func (l *Listers) GetKubeObjects() []runtime.Object
- func (l *Listers) GetSecretLister() corev1listers.SecretLister
- func (l *Listers) GetServiceLister() corev1listers.ServiceLister
- type MockAdminClient
- func (m *MockAdminClient) Close()
- func (m *MockAdminClient) CreateTopics(ctx context.Context, topics []kafka.TopicSpecification, ...) (result []kafka.TopicResult, err error)
- func (m *MockAdminClient) CreateTopicsCalled() bool
- func (m *MockAdminClient) DeleteTopics(ctx context.Context, topics []string, options ...kafka.DeleteTopicsAdminOption) (result []kafka.TopicResult, err error)
- func (m *MockAdminClient) DeleteTopicsCalled() bool
- func (m *MockAdminClient) GetKafkaSecretName(topicName string) string
- type MockConsumer
- func (mc *MockConsumer) Close() error
- func (mc *MockConsumer) Commit() ([]kafka.TopicPartition, error)
- func (mc *MockConsumer) CommitMessage(*kafka.Message) ([]kafka.TopicPartition, error)
- func (mc *MockConsumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
- func (mc *MockConsumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
- func (mc *MockConsumer) OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
- func (mc *MockConsumer) Poll(timeout int) kafka.Event
- func (mc *MockConsumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
- func (mc *MockConsumer) StoreOffsets(offsets []kafka.TopicPartition) (storedOffsets []kafka.TopicPartition, err error)
- func (mc *MockConsumer) Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error
Constants ¶
const ( // Prometheus MetricsPort MetricsPortName = "metrics" // Environment Test Data ServiceAccount = "TestServiceAccount" MetricsPort = 9876 HealthPort = 8082 KafkaOffsetCommitMessageCount = 99 KafkaOffsetCommitDurationMillis = 9999 ChannelImage = "TestChannelImage" ChannelReplicas = 1 DispatcherImage = "TestDispatcherImage" DispatcherReplicas = 1 DefaultNumPartitions = 4 DefaultReplicationFactor = 1 DefaultRetentionMillis = 99999 DefaultEventRetryInitialIntervalMillis = 88888 DefaultEventRetryTimeMillisMax = 11111111 DefaultExponentialBackoff = true // Channel Test Data KafkaChannelNamespace = "kafkachannel-namespace" KafkaChannelName = "kafkachannel-name" KafkaChannelKey = KafkaChannelNamespace + "/" + KafkaChannelName KafkaSecretNamespace = constants.KnativeEventingNamespace // Needs To Match Hardcoded Value In Reconciliation KafkaSecretName = "kafkasecret-name" KafkaSecretKey = KafkaSecretNamespace + "/" + KafkaSecretName ChannelDeploymentName = KafkaSecretName + "-channel" TopicName = KafkaChannelNamespace + "." + KafkaChannelName KafkaSecretDataValueBrokers = "TestKafkaSecretDataBrokers" KafkaSecretDataValueUsername = "TestKafkaSecretDataUsername" KafkaSecretDataValuePassword = "TestKafkaSecretDataPassword" // ChannelSpec Test Data NumPartitions = 123 ReplicationFactor = 456 // Test MetaData ErrorString = "Expected Mock Test Error" // Test Dispatcher Resources DispatcherMemoryRequest = "20Mi" DispatcherCpuRequest = "100m" DispatcherMemoryLimit = "50Mi" DispatcherCpuLimit = "300m" // Test Channel Resources ChannelMemoryRequest = "10Mi" ChannelMemoryLimit = "20Mi" ChannelCpuRequest = "10m" ChannelCpuLimit = "100m" )
Constants
Variables ¶
This section is empty.
Functions ¶
func MakeFactory ¶
MakeFactory creates a reconciler factory with fake clients and controller created by `ctor`.
func NewChannelOwnerRef ¶
func NewChannelOwnerRef() metav1.OwnerReference
Utility Function For Creating A New OwnerReference Model For The Test Channel
func NewFinalizerPatchActionImpl ¶
func NewFinalizerPatchActionImpl() clientgotesting.PatchActionImpl
Utility Function For Creating A PatchActionImpl For The Finalizer Patch Command
func NewKafkaChannel ¶
func NewKafkaChannel(options ...KafkaChannelOption) *kafkav1alpha1.KafkaChannel
Utility Function For Creating A Custom KafkaChannel For Testing
func NewKafkaChannelChannelDeployment ¶
func NewKafkaChannelChannelDeployment() *appsv1.Deployment
Utility Function For Creating A K8S Channel Deployment For The Test Channel
func NewKafkaChannelChannelService ¶
Utility Function For Creating A Custom KafkaChannel "Deployment" Service For Testing
func NewKafkaChannelDispatcherDeployment ¶
func NewKafkaChannelDispatcherDeployment() *appsv1.Deployment
Utility Function For Creating A Custom KafkaChannel Dispatcher Deployment For Testing
func NewKafkaChannelDispatcherService ¶
Utility Function For Creating A Custom KafkaChannel Dispatcher Service For Testing
func NewKafkaChannelFailedReconciliationEvent ¶
func NewKafkaChannelFailedReconciliationEvent() string
Utility Function For Creating A Failed KafkaChannel Reconciled Event
func NewKafkaChannelFinalizerUpdateEvent ¶
func NewKafkaChannelFinalizerUpdateEvent() string
Utility Function For Creating A Successful KafkaChannel Finalizer Update Event
func NewKafkaChannelLabelUpdate ¶
func NewKafkaChannelLabelUpdate(kafkachannel *kafkav1alpha1.KafkaChannel) clientgotesting.UpdateActionImpl
Utility Function For Creating A UpdateActionImpl For The KafkaChannel Labels Update Command
func NewKafkaChannelService ¶
Utility Function For Creating A Custom KafkaChannel "Channel" Service For Testing
func NewKafkaChannelSuccessfulFinalizedEvent ¶
func NewKafkaChannelSuccessfulFinalizedEvent() string
Utility Function For Creating A Successful KafkaChannel Finalizer Update Event
func NewKafkaChannelSuccessfulReconciliationEvent ¶
func NewKafkaChannelSuccessfulReconciliationEvent() string
Utility Function For Creating A Successful KafkaChannel Reconciled Event
func NewKafkaSecret ¶
func NewKafkaSecret(options ...KafkaSecretOption) *corev1.Secret
Create A New Kafka Auth Secret For Testing
func NewKafkaSecretFailedReconciliationEvent ¶
func NewKafkaSecretFailedReconciliationEvent() string
Utility Function For Creating A Failed Kafka secret Reconciled Event
func NewKafkaSecretFinalizerPatchActionImpl ¶
func NewKafkaSecretFinalizerPatchActionImpl() clientgotesting.PatchActionImpl
Utility Function For Creating A PatchActionImpl For The Finalizer Patch Command
func NewKafkaSecretFinalizerUpdateEvent ¶
func NewKafkaSecretFinalizerUpdateEvent() string
Utility Function For Creating A Successful Kafka Secret Finalizer Update Event
func NewKafkaSecretSuccessfulFinalizedEvent ¶
func NewKafkaSecretSuccessfulFinalizedEvent() string
Utility Function For Creating A Successful Kafka Secret Finalizer Update Event
func NewKafkaSecretSuccessfulReconciliationEvent ¶
func NewKafkaSecretSuccessfulReconciliationEvent() string
Utility Function For Creating A Successful Kafka Secret Reconciled Event
func NewSecretOwnerRef ¶
func NewSecretOwnerRef() metav1.OwnerReference
Utility Function For Creating A New OwnerReference Model For The Test Kafka Secret
func WithAddress ¶
func WithAddress(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's Address
func WithChannelDeploymentFailed ¶
func WithChannelDeploymentFailed(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's Channel Deployment As Failed
func WithChannelDeploymentFinalized ¶
func WithChannelDeploymentFinalized(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's Channel Deployment As Finalized
func WithChannelDeploymentReady ¶
func WithChannelDeploymentReady(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's Channel Deployment As READY
func WithChannelServiceFailed ¶
func WithChannelServiceFailed(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's Channel Service As Failed
func WithChannelServiceFinalized ¶
func WithChannelServiceFinalized(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's Channel Service As Finalized
func WithChannelServiceReady ¶
func WithChannelServiceReady(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's Channel Service As READY
func WithDeletionTimestamp ¶
func WithDeletionTimestamp(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's DeletionTimestamp To Current Time
func WithDispatcherDeploymentReady ¶
func WithDispatcherDeploymentReady(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's Dispatcher Deployment As READY
func WithDispatcherFailed ¶
func WithDispatcherFailed(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's Dispatcher Deployment As Failed
func WithFinalizer ¶
func WithFinalizer(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's Finalizer
func WithInitializedConditions ¶
func WithInitializedConditions(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's Status To Initialized State
func WithKafkaChannelServiceFailed ¶
func WithKafkaChannelServiceFailed(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's Services As Failed
func WithKafkaChannelServiceReady ¶
func WithKafkaChannelServiceReady(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's Service As READY
func WithKafkaSecretDeleted ¶
Set The Kafka Secret's DeletionTimestamp To Current Time
func WithKafkaSecretFinalizer ¶
Set The Kafka Secret's Finalizer
func WithLabels ¶
func WithLabels(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's Labels
func WithTopicReady ¶
func WithTopicReady(kafkachannel *kafkav1alpha1.KafkaChannel)
Set The KafkaChannel's Topic READY
Types ¶
type Ctor ¶
type Ctor func(context.Context, *Listers, configmap.Watcher) controller.Reconciler
Ctor functions create a k8s controller with given params.
type KafkaChannelOption ¶
type KafkaChannelOption func(*kafkav1alpha1.KafkaChannel)
KafkaChannelOption Enables Customization Of A KafkaChannel
type KafkaSecretOption ¶
KafkaSecretOption Enables Customization Of A KafkaChannel
type Listers ¶
type Listers struct {
// contains filtered or unexported fields
}
func NewListers ¶
func (*Listers) GetAllObjects ¶
func (*Listers) GetDeploymentLister ¶
func (l *Listers) GetDeploymentLister() appsv1listers.DeploymentLister
func (*Listers) GetEndpointsLister ¶
func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister
func (*Listers) GetEventingObjects ¶
func (*Listers) GetEventsObjects ¶
func (*Listers) GetKafkaChannelLister ¶
func (l *Listers) GetKafkaChannelLister() kafkalisters.KafkaChannelLister
func (*Listers) GetKafkaChannelObjects ¶
func (*Listers) GetKubeObjects ¶
func (*Listers) GetSecretLister ¶
func (l *Listers) GetSecretLister() corev1listers.SecretLister
func (*Listers) GetServiceLister ¶
func (l *Listers) GetServiceLister() corev1listers.ServiceLister
type MockAdminClient ¶
type MockAdminClient struct { MockCreateTopicFunc func(context.Context, []kafka.TopicSpecification, ...kafka.CreateTopicsAdminOption) ([]kafka.TopicResult, error) MockDeleteTopicFunc func(context.Context, []string, ...kafka.DeleteTopicsAdminOption) ([]kafka.TopicResult, error) // contains filtered or unexported fields }
Mock Kafka AdminClient Implementation
func (*MockAdminClient) Close ¶
func (m *MockAdminClient) Close()
Mock Kafka AdminClient Close Function - NoOp
func (*MockAdminClient) CreateTopics ¶
func (m *MockAdminClient) CreateTopics(ctx context.Context, topics []kafka.TopicSpecification, options ...kafka.CreateTopicsAdminOption) (result []kafka.TopicResult, err error)
Mock Kafka AdminClient CreateTopics Function - Calls Custom CreateTopics If Specified, Otherwise Returns Success
func (*MockAdminClient) CreateTopicsCalled ¶
func (m *MockAdminClient) CreateTopicsCalled() bool
Check On Calls To CreateTopics()
func (*MockAdminClient) DeleteTopics ¶
func (m *MockAdminClient) DeleteTopics(ctx context.Context, topics []string, options ...kafka.DeleteTopicsAdminOption) (result []kafka.TopicResult, err error)
Mock Kafka AdminClient DeleteTopics Function - Calls Custom DeleteTopics If Specified, Otherwise Returns Success
func (*MockAdminClient) DeleteTopicsCalled ¶
func (m *MockAdminClient) DeleteTopicsCalled() bool
Check On Calls To DeleteTopics()
func (*MockAdminClient) GetKafkaSecretName ¶
func (m *MockAdminClient) GetKafkaSecretName(topicName string) string
Mock Kafka Secret Name Function - Return Test Data
type MockConsumer ¶
type MockConsumer struct {
// contains filtered or unexported fields
}
func (*MockConsumer) Close ¶
func (mc *MockConsumer) Close() error
func (*MockConsumer) Commit ¶
func (mc *MockConsumer) Commit() ([]kafka.TopicPartition, error)
func (*MockConsumer) CommitMessage ¶
func (mc *MockConsumer) CommitMessage(*kafka.Message) ([]kafka.TopicPartition, error)
func (*MockConsumer) CommitOffsets ¶
func (mc *MockConsumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
func (*MockConsumer) GetMetadata ¶
func (*MockConsumer) OffsetsForTimes ¶
func (mc *MockConsumer) OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
func (*MockConsumer) QueryWatermarkOffsets ¶
func (*MockConsumer) StoreOffsets ¶
func (mc *MockConsumer) StoreOffsets(offsets []kafka.TopicPartition) (storedOffsets []kafka.TopicPartition, err error)
func (*MockConsumer) Subscribe ¶
func (mc *MockConsumer) Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error