Documentation ¶
Index ¶
- Constants
- Variables
- func MakeFactory(ctor Ctor, logger *zap.Logger) Factory
- func NewResetOffset(options ...ResetOffsetOption) *kafkav1alpha1.ResetOffset
- func NewResetOffsetNamespacedName() types.NamespacedName
- func WithDeletionTimestamp(resetOffset *kafkav1alpha1.ResetOffset)
- func WithFinalizer(resetOffset *kafkav1alpha1.ResetOffset)
- func WithStatusInitialized(resetOffset *kafkav1alpha1.ResetOffset)
- type Ctor
- type Listers
- type MockClient
- func (c *MockClient) Broker(brokerID int32) (*sarama.Broker, error)
- func (c *MockClient) Brokers() []*sarama.Broker
- func (c *MockClient) Close() error
- func (c *MockClient) Closed() bool
- func (c *MockClient) Config() *sarama.Config
- func (c *MockClient) Controller() (*sarama.Broker, error)
- func (c *MockClient) Coordinator(consumerGroup string) (*sarama.Broker, error)
- func (c *MockClient) GetOffset(topic string, partitionID int32, time int64) (int64, error)
- func (c *MockClient) InSyncReplicas(topic string, partitionID int32) ([]int32, error)
- func (c *MockClient) InitProducerID() (*sarama.InitProducerIDResponse, error)
- func (c *MockClient) Leader(topic string, partitionID int32) (*sarama.Broker, error)
- func (c *MockClient) OfflineReplicas(topic string, partitionID int32) ([]int32, error)
- func (c *MockClient) Partitions(topic string) ([]int32, error)
- func (c *MockClient) RefreshBrokers(addrs []string) error
- func (c *MockClient) RefreshController() (*sarama.Broker, error)
- func (c *MockClient) RefreshCoordinator(consumerGroup string) error
- func (c *MockClient) RefreshMetadata(topics ...string) error
- func (c *MockClient) Replicas(topic string, partitionID int32) ([]int32, error)
- func (c *MockClient) Topics() ([]string, error)
- func (c *MockClient) WritablePartitions(topic string) ([]int32, error)
- type MockClientOption
- func WithClientMockClose(err error) MockClientOption
- func WithClientMockClosed(closed bool) MockClientOption
- func WithClientMockGetOffset(topic string, partition int32, offsetTime int64, offset int64, err error) MockClientOption
- func WithClientMockPartitions(topic string, partitions []int32, err error) MockClientOption
- type MockOffsetManager
- type MockOffsetManagerOption
- type MockPartitionOffsetManager
- func (p *MockPartitionOffsetManager) AsyncClose()
- func (p *MockPartitionOffsetManager) Close() error
- func (p *MockPartitionOffsetManager) Errors() <-chan *sarama.ConsumerError
- func (p *MockPartitionOffsetManager) MarkOffset(offset int64, metadata string)
- func (p *MockPartitionOffsetManager) NextOffset() (int64, string)
- func (p *MockPartitionOffsetManager) ResetOffset(offset int64, metadata string)
- type MockPartitionOffsetManagerOption
- func WithPartitionOffsetManagerMockAsyncClose() MockPartitionOffsetManagerOption
- func WithPartitionOffsetManagerMockClose(err error) MockPartitionOffsetManagerOption
- func WithPartitionOffsetManagerMockErrors(errors ...*sarama.ConsumerError) MockPartitionOffsetManagerOption
- func WithPartitionOffsetManagerMockMarkOffset(offset int64, metadata string) MockPartitionOffsetManagerOption
- func WithPartitionOffsetManagerMockNextOffset(offset int64, metadata string) MockPartitionOffsetManagerOption
- func WithPartitionOffsetManagerMockResetOffset(offset int64, metadata string) MockPartitionOffsetManagerOption
- type MockPodLister
- type MockPodNamespaceLister
- type ResetOffsetOption
- func WithSpecOffsetTime(time string) ResetOffsetOption
- func WithSpecRef(ref *duckv1.KReference) ResetOffsetOption
- func WithStatusAcquireDataPlaneServices(state bool, failed ...string) ResetOffsetOption
- func WithStatusConsumerGroupsStarted(state bool, failed ...string) ResetOffsetOption
- func WithStatusConsumerGroupsStopped(state bool, failed ...string) ResetOffsetOption
- func WithStatusGroup(group string) ResetOffsetOption
- func WithStatusOffsetsUpdated(state bool, failed ...string) ResetOffsetOption
- func WithStatusPartitions(partitions []kafkav1alpha1.OffsetMapping) ResetOffsetOption
- func WithStatusRefMapped(state bool, failed ...string) ResetOffsetOption
- func WithStatusTopic(topic string) ResetOffsetOption
Constants ¶
View Source
const ( ResetOffsetNamespace = "resetoffset-namespace" ResetOffsetName = "resetoffset-name" ResetOffsetKey = ResetOffsetNamespace + "/" + ResetOffsetName ResetOffsetFinalizer = "resetoffsets.kafka.eventing.knative.dev" RefAPIVersion = "ref-apiversion" RefKind = "ref-kind" RefNamespace = "ref-namespace" RefName = "ref-name" Brokers = "TestKafkaBrokers" TopicName = "TestTopicName" GroupId = "TestGroupId" )
Variables ¶
View Source
var DeletionTimestamp = metav1.Now()
Functions ¶
func MakeFactory ¶
MakeFactory creates a reconciler factory with fake clients and controller created by `ctor`.
func NewResetOffset ¶
func NewResetOffset(options ...ResetOffsetOption) *kafkav1alpha1.ResetOffset
NewResetOffset creates a custom ResetOffset
func NewResetOffsetNamespacedName ¶
func NewResetOffsetNamespacedName() types.NamespacedName
func WithDeletionTimestamp ¶
func WithDeletionTimestamp(resetOffset *kafkav1alpha1.ResetOffset)
func WithFinalizer ¶
func WithFinalizer(resetOffset *kafkav1alpha1.ResetOffset)
func WithStatusInitialized ¶
func WithStatusInitialized(resetOffset *kafkav1alpha1.ResetOffset)
Types ¶
type Ctor ¶
type Ctor func(context.Context, *Listers, configmap.Watcher, map[string]interface{}) controller.Reconciler
Ctor functions create a k8s controller with given params.
type Listers ¶
type Listers struct {
// contains filtered or unexported fields
}
func NewListers ¶
func (*Listers) GetAllObjects ¶
func (*Listers) GetEventingObjects ¶
func (*Listers) GetKubeObjects ¶
func (*Listers) GetResetOffsetLister ¶
func (l *Listers) GetResetOffsetLister() resetoffsetlisters.ResetOffsetLister
func (*Listers) GetResetOffsetObjects ¶
type MockClient ¶
func NewMockClient ¶
func NewMockClient(options ...MockClientOption) *MockClient
func (*MockClient) Brokers ¶
func (c *MockClient) Brokers() []*sarama.Broker
func (*MockClient) Close ¶
func (c *MockClient) Close() error
func (*MockClient) Closed ¶
func (c *MockClient) Closed() bool
func (*MockClient) Config ¶
func (c *MockClient) Config() *sarama.Config
func (*MockClient) Controller ¶
func (c *MockClient) Controller() (*sarama.Broker, error)
func (*MockClient) Coordinator ¶
func (c *MockClient) Coordinator(consumerGroup string) (*sarama.Broker, error)
func (*MockClient) InSyncReplicas ¶
func (c *MockClient) InSyncReplicas(topic string, partitionID int32) ([]int32, error)
func (*MockClient) InitProducerID ¶
func (c *MockClient) InitProducerID() (*sarama.InitProducerIDResponse, error)
func (*MockClient) OfflineReplicas ¶
func (c *MockClient) OfflineReplicas(topic string, partitionID int32) ([]int32, error)
func (*MockClient) Partitions ¶
func (c *MockClient) Partitions(topic string) ([]int32, error)
func (*MockClient) RefreshBrokers ¶
func (c *MockClient) RefreshBrokers(addrs []string) error
func (*MockClient) RefreshController ¶
func (c *MockClient) RefreshController() (*sarama.Broker, error)
func (*MockClient) RefreshCoordinator ¶
func (c *MockClient) RefreshCoordinator(consumerGroup string) error
func (*MockClient) RefreshMetadata ¶
func (c *MockClient) RefreshMetadata(topics ...string) error
func (*MockClient) Replicas ¶
func (c *MockClient) Replicas(topic string, partitionID int32) ([]int32, error)
func (*MockClient) Topics ¶
func (c *MockClient) Topics() ([]string, error)
func (*MockClient) WritablePartitions ¶
func (c *MockClient) WritablePartitions(topic string) ([]int32, error)
type MockClientOption ¶
type MockClientOption = func(*MockClient)
func WithClientMockClose ¶
func WithClientMockClose(err error) MockClientOption
func WithClientMockClosed ¶
func WithClientMockClosed(closed bool) MockClientOption
func WithClientMockGetOffset ¶
func WithClientMockPartitions ¶
func WithClientMockPartitions(topic string, partitions []int32, err error) MockClientOption
type MockOffsetManager ¶
func NewMockOffsetManager ¶
func NewMockOffsetManager(options ...MockOffsetManagerOption) *MockOffsetManager
func (*MockOffsetManager) Close ¶
func (o *MockOffsetManager) Close() error
func (*MockOffsetManager) Commit ¶
func (o *MockOffsetManager) Commit()
func (*MockOffsetManager) ManagePartition ¶
func (o *MockOffsetManager) ManagePartition(topic string, partition int32) (sarama.PartitionOffsetManager, error)
type MockOffsetManagerOption ¶
type MockOffsetManagerOption = func(*MockOffsetManager)
func WithOffsetManagerMockClose ¶
func WithOffsetManagerMockClose(err error) MockOffsetManagerOption
func WithOffsetManagerMockCommit ¶
func WithOffsetManagerMockCommit() MockOffsetManagerOption
func WithOffsetManagerMockManagePartition ¶
func WithOffsetManagerMockManagePartition(topic string, partition int32, partitionOffsetManager sarama.PartitionOffsetManager, err error) MockOffsetManagerOption
type MockPartitionOffsetManager ¶
func NewMockPartitionOffsetManager ¶
func NewMockPartitionOffsetManager(options ...MockPartitionOffsetManagerOption) *MockPartitionOffsetManager
func (*MockPartitionOffsetManager) AsyncClose ¶
func (p *MockPartitionOffsetManager) AsyncClose()
func (*MockPartitionOffsetManager) Close ¶
func (p *MockPartitionOffsetManager) Close() error
func (*MockPartitionOffsetManager) Errors ¶
func (p *MockPartitionOffsetManager) Errors() <-chan *sarama.ConsumerError
func (*MockPartitionOffsetManager) MarkOffset ¶
func (p *MockPartitionOffsetManager) MarkOffset(offset int64, metadata string)
func (*MockPartitionOffsetManager) NextOffset ¶
func (p *MockPartitionOffsetManager) NextOffset() (int64, string)
func (*MockPartitionOffsetManager) ResetOffset ¶
func (p *MockPartitionOffsetManager) ResetOffset(offset int64, metadata string)
type MockPartitionOffsetManagerOption ¶
type MockPartitionOffsetManagerOption = func(*MockPartitionOffsetManager)
func WithPartitionOffsetManagerMockAsyncClose ¶
func WithPartitionOffsetManagerMockAsyncClose() MockPartitionOffsetManagerOption
func WithPartitionOffsetManagerMockClose ¶
func WithPartitionOffsetManagerMockClose(err error) MockPartitionOffsetManagerOption
func WithPartitionOffsetManagerMockErrors ¶
func WithPartitionOffsetManagerMockErrors(errors ...*sarama.ConsumerError) MockPartitionOffsetManagerOption
func WithPartitionOffsetManagerMockMarkOffset ¶
func WithPartitionOffsetManagerMockMarkOffset(offset int64, metadata string) MockPartitionOffsetManagerOption
func WithPartitionOffsetManagerMockNextOffset ¶
func WithPartitionOffsetManagerMockNextOffset(offset int64, metadata string) MockPartitionOffsetManagerOption
func WithPartitionOffsetManagerMockResetOffset ¶
func WithPartitionOffsetManagerMockResetOffset(offset int64, metadata string) MockPartitionOffsetManagerOption
type MockPodLister ¶
func (*MockPodLister) Pods ¶
func (l *MockPodLister) Pods(namespace string) listerscorev1.PodNamespaceLister
type MockPodNamespaceLister ¶
type ResetOffsetOption ¶
type ResetOffsetOption func(resetOffset *kafkav1alpha1.ResetOffset)
ResetOffsetOption allow for customizing a ResetOffset
func WithSpecOffsetTime ¶
func WithSpecOffsetTime(time string) ResetOffsetOption
func WithSpecRef ¶
func WithSpecRef(ref *duckv1.KReference) ResetOffsetOption
func WithStatusAcquireDataPlaneServices ¶
func WithStatusAcquireDataPlaneServices(state bool, failed ...string) ResetOffsetOption
func WithStatusConsumerGroupsStarted ¶
func WithStatusConsumerGroupsStarted(state bool, failed ...string) ResetOffsetOption
func WithStatusConsumerGroupsStopped ¶
func WithStatusConsumerGroupsStopped(state bool, failed ...string) ResetOffsetOption
func WithStatusGroup ¶
func WithStatusGroup(group string) ResetOffsetOption
func WithStatusOffsetsUpdated ¶
func WithStatusOffsetsUpdated(state bool, failed ...string) ResetOffsetOption
func WithStatusPartitions ¶
func WithStatusPartitions(partitions []kafkav1alpha1.OffsetMapping) ResetOffsetOption
func WithStatusRefMapped ¶
func WithStatusRefMapped(state bool, failed ...string) ResetOffsetOption
func WithStatusTopic ¶
func WithStatusTopic(topic string) ResetOffsetOption
Click to show internal directories.
Click to hide internal directories.