Documentation ¶
Index ¶
- Constants
- Variables
- func Addressable(configs *Configs) func(broker *eventing.Broker)
- func BrokerConfig(bootstrapServers string, numPartitions, replicationFactor int) *corev1.ConfigMap
- func BrokerReady(broker *eventing.Broker)
- func ConfigMapUpdate(configs *Configs, brokers *coreconfig.Brokers) clientgotesting.UpdateActionImpl
- func ConfigMapUpdatedReady(configs *Configs) func(broker *eventing.Broker)
- func ConfigNotParsed(reason string) func(broker *eventing.Broker)
- func ConfigParsed(broker *eventing.Broker)
- func DispatcherPodUpdate(namespace string, annotations map[string]string) clientgotesting.UpdateActionImpl
- func FailedToCreateTopic(broker *eventing.Broker)
- func FailedToGetConfigMap(configs *Configs) func(broker *eventing.Broker)
- func GetTopic() string
- func KReference(configMap *corev1.ConfigMap) *duckv1.KReference
- func NewBroker(options ...reconcilertesting.BrokerOption) runtime.Object
- func NewConfigMap(configs *Configs, data []byte) runtime.Object
- func NewConfigMapFromBrokers(brokers *coreconfig.Brokers, configs *Configs) runtime.Object
- func NewDeletedBroker(options ...reconcilertesting.BrokerOption) runtime.Object
- func NewDispatcherPod(namespace string, annotations map[string]string) runtime.Object
- func NewFactory(configs *broker.Configs, ctor Ctor) Factory
- func NewReceiverPod(namespace string, annotations map[string]string) runtime.Object
- func NewService() *corev1.Service
- func ReceiverPodUpdate(namespace string, annotations map[string]string) clientgotesting.UpdateActionImpl
- func TopicReady(broker *eventing.Broker)
- func WithBrokerConfig(reference *duckv1.KReference) func(*eventing.Broker)
- func WithDelivery() func(*eventing.Broker)
- type Ctor
- type Listers
- func (l *Listers) GetAllObjects() []runtime.Object
- func (l *Listers) GetBrokerLister() eventinglisters.BrokerLister
- func (l *Listers) GetConfigMapLister() corelisters.ConfigMapLister
- func (l *Listers) GetEventingObjects() []runtime.Object
- func (l *Listers) GetKubeObjects() []runtime.Object
- func (l *Listers) GetPodLister() corelisters.PodLister
- func (l *Listers) GetTriggerLister() eventinglisters.TriggerLister
- type MockKafkaClusterAdmin
- func (m MockKafkaClusterAdmin) AlterConfig(resourceType sarama.ConfigResourceType, name string, ...) error
- func (m MockKafkaClusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error
- func (m MockKafkaClusterAdmin) Close() error
- func (m MockKafkaClusterAdmin) CreateACL(resource sarama.Resource, acl sarama.Acl) error
- func (m MockKafkaClusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
- func (m MockKafkaClusterAdmin) CreateTopic(topic string, detail *sarama.TopicDetail, validateOnly bool) error
- func (m MockKafkaClusterAdmin) DeleteACL(filter sarama.AclFilter, validateOnly bool) ([]sarama.MatchingAcl, error)
- func (m MockKafkaClusterAdmin) DeleteConsumerGroup(group string) error
- func (m MockKafkaClusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error
- func (m MockKafkaClusterAdmin) DeleteTopic(topic string) error
- func (m MockKafkaClusterAdmin) DescribeCluster() (brokers []*sarama.Broker, controllerID int32, err error)
- func (m MockKafkaClusterAdmin) DescribeConfig(resource sarama.ConfigResource) ([]sarama.ConfigEntry, error)
- func (m MockKafkaClusterAdmin) DescribeConsumerGroups(groups []string) ([]*sarama.GroupDescription, error)
- func (m MockKafkaClusterAdmin) DescribeLogDirs(brokers []int32) (map[int32][]sarama.DescribeLogDirsResponseDirMetadata, error)
- func (m MockKafkaClusterAdmin) DescribeTopics(topics []string) (metadata []*sarama.TopicMetadata, err error)
- func (m MockKafkaClusterAdmin) ListAcls(filter sarama.AclFilter) ([]sarama.ResourceAcls, error)
- func (m MockKafkaClusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error)
- func (m MockKafkaClusterAdmin) ListConsumerGroups() (map[string]string, error)
- func (m MockKafkaClusterAdmin) ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*sarama.PartitionReplicaReassignmentsStatus, ...)
- func (m MockKafkaClusterAdmin) ListTopics() (map[string]sarama.TopicDetail, error)
Constants ¶
View Source
const ( BrokerNamespace = "test-namespace" BrokerName = "test-broker" ConfigMapNamespace = "test-namespace-config-map" ConfigMapName = "test-config-cm" ServiceURL = "http://test-service.test-service-namespace.svc.cluster.local/" BrokerUUID = "e7185016-5d98-4b54-84e8-3b1cd4acc6b4" TriggerUUID = "e7185016-5d98-4b54-84e8-3b1cd4acc6b5" )
Variables ¶
View Source
var DefaultConfigs = &broker.Configs{ EnvConfigs: broker.EnvConfigs{ DataPlaneConfigMapName: "kafka-broker-brokers-triggers", DataPlaneConfigMapNamespace: "knative-eventing", BrokerIngressName: "kafka-broker-receiver", SystemNamespace: "knative-eventing", DataPlaneConfigFormat: base.Json, }, BootstrapServers: "", }
Functions ¶
func Addressable ¶
func BrokerConfig ¶
func BrokerReady ¶
func ConfigMapUpdate ¶
func ConfigMapUpdate(configs *Configs, brokers *coreconfig.Brokers) clientgotesting.UpdateActionImpl
func ConfigMapUpdatedReady ¶
func ConfigNotParsed ¶
func ConfigParsed ¶
func DispatcherPodUpdate ¶
func DispatcherPodUpdate(namespace string, annotations map[string]string) clientgotesting.UpdateActionImpl
func FailedToCreateTopic ¶
func FailedToGetConfigMap ¶
func KReference ¶
func KReference(configMap *corev1.ConfigMap) *duckv1.KReference
func NewBroker ¶
func NewBroker(options ...reconcilertesting.BrokerOption) runtime.Object
NewBroker creates a new Broker with broker class equals to kafka.BrokerClass
func NewConfigMap ¶
func NewConfigMapFromBrokers ¶
func NewConfigMapFromBrokers(brokers *coreconfig.Brokers, configs *Configs) runtime.Object
func NewDeletedBroker ¶
func NewDeletedBroker(options ...reconcilertesting.BrokerOption) runtime.Object
func NewDispatcherPod ¶
func NewFactory ¶
func NewReceiverPod ¶
func NewService ¶
func ReceiverPodUpdate ¶
func ReceiverPodUpdate(namespace string, annotations map[string]string) clientgotesting.UpdateActionImpl
func TopicReady ¶
func WithBrokerConfig ¶
func WithBrokerConfig(reference *duckv1.KReference) func(*eventing.Broker)
func WithDelivery ¶
Types ¶
type Ctor ¶
type Ctor func(ctx context.Context, listers *Listers, configs *broker.Configs, row *TableRow) pkgcontroller.Reconciler
Ctor functions create a k8s controller with given params.
type Listers ¶
type Listers struct {
// contains filtered or unexported fields
}
func (*Listers) GetAllObjects ¶
func (*Listers) GetBrokerLister ¶
func (l *Listers) GetBrokerLister() eventinglisters.BrokerLister
func (*Listers) GetConfigMapLister ¶
func (l *Listers) GetConfigMapLister() corelisters.ConfigMapLister
func (*Listers) GetEventingObjects ¶
func (*Listers) GetKubeObjects ¶
func (*Listers) GetPodLister ¶
func (l *Listers) GetPodLister() corelisters.PodLister
func (*Listers) GetTriggerLister ¶
func (l *Listers) GetTriggerLister() eventinglisters.TriggerLister
type MockKafkaClusterAdmin ¶
type MockKafkaClusterAdmin struct { // (Create|Delete)Topic ExpectedTopicName string // CreateTopic ExpectedTopicDetail sarama.TopicDetail ErrorOnCreateTopic error // DeleteTopic ErrorOnDeleteTopic error T *testing.T }
func (MockKafkaClusterAdmin) AlterConfig ¶
func (m MockKafkaClusterAdmin) AlterConfig(resourceType sarama.ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
func (MockKafkaClusterAdmin) AlterPartitionReassignments ¶
func (m MockKafkaClusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error
func (MockKafkaClusterAdmin) Close ¶
func (m MockKafkaClusterAdmin) Close() error
func (MockKafkaClusterAdmin) CreatePartitions ¶
func (MockKafkaClusterAdmin) CreateTopic ¶
func (m MockKafkaClusterAdmin) CreateTopic(topic string, detail *sarama.TopicDetail, validateOnly bool) error
func (MockKafkaClusterAdmin) DeleteACL ¶
func (m MockKafkaClusterAdmin) DeleteACL(filter sarama.AclFilter, validateOnly bool) ([]sarama.MatchingAcl, error)
func (MockKafkaClusterAdmin) DeleteConsumerGroup ¶
func (m MockKafkaClusterAdmin) DeleteConsumerGroup(group string) error
func (MockKafkaClusterAdmin) DeleteRecords ¶
func (m MockKafkaClusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error
func (MockKafkaClusterAdmin) DeleteTopic ¶
func (m MockKafkaClusterAdmin) DeleteTopic(topic string) error
func (MockKafkaClusterAdmin) DescribeCluster ¶
func (m MockKafkaClusterAdmin) DescribeCluster() (brokers []*sarama.Broker, controllerID int32, err error)
func (MockKafkaClusterAdmin) DescribeConfig ¶
func (m MockKafkaClusterAdmin) DescribeConfig(resource sarama.ConfigResource) ([]sarama.ConfigEntry, error)
func (MockKafkaClusterAdmin) DescribeConsumerGroups ¶
func (m MockKafkaClusterAdmin) DescribeConsumerGroups(groups []string) ([]*sarama.GroupDescription, error)
func (MockKafkaClusterAdmin) DescribeLogDirs ¶
func (m MockKafkaClusterAdmin) DescribeLogDirs(brokers []int32) (map[int32][]sarama.DescribeLogDirsResponseDirMetadata, error)
func (MockKafkaClusterAdmin) DescribeTopics ¶
func (m MockKafkaClusterAdmin) DescribeTopics(topics []string) (metadata []*sarama.TopicMetadata, err error)
func (MockKafkaClusterAdmin) ListAcls ¶
func (m MockKafkaClusterAdmin) ListAcls(filter sarama.AclFilter) ([]sarama.ResourceAcls, error)
func (MockKafkaClusterAdmin) ListConsumerGroupOffsets ¶
func (m MockKafkaClusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error)
func (MockKafkaClusterAdmin) ListConsumerGroups ¶
func (m MockKafkaClusterAdmin) ListConsumerGroups() (map[string]string, error)
func (MockKafkaClusterAdmin) ListPartitionReassignments ¶
func (m MockKafkaClusterAdmin) ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*sarama.PartitionReplicaReassignmentsStatus, err error)
func (MockKafkaClusterAdmin) ListTopics ¶
func (m MockKafkaClusterAdmin) ListTopics() (map[string]sarama.TopicDetail, error)
Click to show internal directories.
Click to hide internal directories.