client

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2020 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConfigEntry

type ConfigEntry struct {
	Name      string
	Value     string
	ReadOnly  bool
	Default   bool
	Source    string
	Sensitive bool
	Synonyms  []*ConfigSynonym
}

type ConfigResource

type ConfigResource struct {
	Type        int
	Name        string
	ConfigNames []string
}

type ConfigSynonym

type ConfigSynonym struct {
	ConfigName  string
	ConfigValue string
	Source      string
}

type Configurer

type Configurer interface {
	GetConfig(topic string) ([]ConfigEntry, error)
	UpdateConfig(topics []string, configMap map[string]*string, validateOnly bool) error
}

type ConsumerLister added in v0.1.0

type ConsumerLister interface {
	ListConsumerGroups() (map[string]string, error)
	GetConsumerGroupsForTopic([]string, string) (chan string, error)
}

type Creator

type Creator interface {
	Create(topic string, detail TopicDetail, validateOnly bool) error
	CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
}

type Deleter

type Deleter interface {
	Delete(topics []string) error
}

type Describer

type Describer interface {
	Describe(topics []string) ([]*TopicMetadata, error)
}

type KafkaAPIClient

type KafkaAPIClient interface {
	CreateTopic(topic string, detail TopicDetail, validateOnly bool) error
	CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
	ListBrokers() map[int]string
	ListTopicDetails() (map[string]TopicDetail, error)
	DeleteTopic(topics []string) error
	DescribeTopicMetadata(topics []string) ([]*TopicMetadata, error)
	UpdateConfig(resourceType int, name string, entries map[string]*string, validateOnly bool) error
	GetTopicResourceType() int
	GetConfig(resource ConfigResource) ([]ConfigEntry, error)
}

type KafkaSSHClient

type KafkaSSHClient interface {
	ListTopics(ListTopicsRequest) ([]string, error)
}

func NewKafkaRemoteClient

func NewKafkaRemoteClient(apiClient KafkaAPIClient, sshClient sshCli) (KafkaSSHClient, error)

type ListTopicsRequest

type ListTopicsRequest struct {
	LastWritten int64
	DataDir     string
}

type Lister

type Lister interface {
	List() (map[string]TopicDetail, error)
	ListLastWrittenTopics(int64, string) ([]string, error)
	ListOnly(regex string, include bool) ([]string, error)
}

type MockClusterAdmin

type MockClusterAdmin struct {
	mock.Mock
}

func (*MockClusterAdmin) AlterConfig

func (m *MockClusterAdmin) AlterConfig(resourceType sarama.ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error

func (*MockClusterAdmin) Close

func (m *MockClusterAdmin) Close() error

func (*MockClusterAdmin) CreateACL

func (m *MockClusterAdmin) CreateACL(resource sarama.Resource, acl sarama.Acl) error

func (*MockClusterAdmin) CreatePartitions

func (m *MockClusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error

func (*MockClusterAdmin) CreateTopic

func (m *MockClusterAdmin) CreateTopic(topic string, detail *sarama.TopicDetail, validateOnly bool) error

func (*MockClusterAdmin) DeleteACL

func (m *MockClusterAdmin) DeleteACL(filter sarama.AclFilter, validateOnly bool) ([]sarama.MatchingAcl, error)

func (*MockClusterAdmin) DeleteConsumerGroup

func (m *MockClusterAdmin) DeleteConsumerGroup(group string) error

func (*MockClusterAdmin) DeleteRecords

func (m *MockClusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error

func (*MockClusterAdmin) DeleteTopic

func (m *MockClusterAdmin) DeleteTopic(topic string) error

func (*MockClusterAdmin) DescribeCluster

func (m *MockClusterAdmin) DescribeCluster() (brokers []*sarama.Broker, controllerID int32, err error)

func (*MockClusterAdmin) DescribeConfig

func (m *MockClusterAdmin) DescribeConfig(resource sarama.ConfigResource) ([]sarama.ConfigEntry, error)

func (*MockClusterAdmin) DescribeConsumerGroups

func (m *MockClusterAdmin) DescribeConsumerGroups(groups []string) ([]*sarama.GroupDescription, error)

func (*MockClusterAdmin) DescribeTopics

func (m *MockClusterAdmin) DescribeTopics(topics []string) (metadata []*sarama.TopicMetadata, err error)

func (*MockClusterAdmin) ListAcls

func (m *MockClusterAdmin) ListAcls(filter sarama.AclFilter) ([]sarama.ResourceAcls, error)

func (*MockClusterAdmin) ListConsumerGroupOffsets

func (m *MockClusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error)

func (*MockClusterAdmin) ListConsumerGroups

func (m *MockClusterAdmin) ListConsumerGroups() (map[string]string, error)

func (*MockClusterAdmin) ListTopics

func (m *MockClusterAdmin) ListTopics() (map[string]sarama.TopicDetail, error)

type MockConfigurer

type MockConfigurer struct {
	mock.Mock
}

func (*MockConfigurer) GetConfig

func (m *MockConfigurer) GetConfig(topic string) ([]ConfigEntry, error)

func (*MockConfigurer) UpdateConfig

func (m *MockConfigurer) UpdateConfig(topics []string, configMap map[string]*string, validateOnly bool) error

type MockCreator

type MockCreator struct {
	mock.Mock
}

func (*MockCreator) Create

func (m *MockCreator) Create(topic string, detail TopicDetail, validateOnly bool) error

func (*MockCreator) CreatePartitions

func (m *MockCreator) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error

type MockDeleter

type MockDeleter struct {
	mock.Mock
}

func (*MockDeleter) Delete

func (m *MockDeleter) Delete(topics []string) error

type MockDescriber

type MockDescriber struct {
	mock.Mock
}

func (*MockDescriber) Describe

func (m *MockDescriber) Describe(topics []string) ([]*TopicMetadata, error)

type MockKafkaAPIClient

type MockKafkaAPIClient struct {
	mock.Mock
}

func (*MockKafkaAPIClient) CreatePartitions

func (m *MockKafkaAPIClient) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error

func (*MockKafkaAPIClient) CreateTopic

func (m *MockKafkaAPIClient) CreateTopic(topic string, detail TopicDetail, validateOnly bool) error

func (*MockKafkaAPIClient) DeleteTopic

func (m *MockKafkaAPIClient) DeleteTopic(topics []string) error

func (*MockKafkaAPIClient) DescribeTopicMetadata

func (m *MockKafkaAPIClient) DescribeTopicMetadata(topics []string) ([]*TopicMetadata, error)

func (*MockKafkaAPIClient) GetConfig

func (m *MockKafkaAPIClient) GetConfig(resource ConfigResource) ([]ConfigEntry, error)

func (*MockKafkaAPIClient) GetTopicResourceType

func (m *MockKafkaAPIClient) GetTopicResourceType() int

func (*MockKafkaAPIClient) ListBrokers

func (m *MockKafkaAPIClient) ListBrokers() map[int]string

func (*MockKafkaAPIClient) ListTopicDetails

func (m *MockKafkaAPIClient) ListTopicDetails() (map[string]TopicDetail, error)

func (*MockKafkaAPIClient) UpdateConfig

func (m *MockKafkaAPIClient) UpdateConfig(resourceType int, name string, entries map[string]*string, validateOnly bool) error

type MockLister

type MockLister struct {
	mock.Mock
}

func (*MockLister) List

func (m *MockLister) List() (map[string]TopicDetail, error)

func (*MockLister) ListLastWrittenTopics

func (m *MockLister) ListLastWrittenTopics(lastWrittenEpoch int64, dataDir string) ([]string, error)

func (*MockLister) ListOnly

func (m *MockLister) ListOnly(regex string, include bool) ([]string, error)

type MockPartitioner

type MockPartitioner struct {
	mock.Mock
}

func (*MockPartitioner) IncreaseReplication

func (m *MockPartitioner) IncreaseReplication(topicsMetadata []*TopicMetadata, replicationFactor, numOfBrokers, batch,
	timeoutPerBatchInS, pollIntervalInS, throttle int) error

func (*MockPartitioner) ReassignPartitions

func (m *MockPartitioner) ReassignPartitions(topics []string, brokerList string, batch, timeoutPerBatchInS, pollIntervalInS, throttle int) error

type MockSSHCli

type MockSSHCli struct {
	mock.Mock
}

func (*MockSSHCli) DialAndExecute

func (m *MockSSHCli) DialAndExecute(address string, commands ...shellCmd) (*bytes.Buffer, error)

type MockSaramaClient

type MockSaramaClient struct {
	mock.Mock
}

func (*MockSaramaClient) Brokers

func (m *MockSaramaClient) Brokers() []*sarama.Broker

func (*MockSaramaClient) Close

func (m *MockSaramaClient) Close() error

func (*MockSaramaClient) Closed

func (m *MockSaramaClient) Closed() bool

func (*MockSaramaClient) Config

func (m *MockSaramaClient) Config() *sarama.Config

func (*MockSaramaClient) Controller

func (m *MockSaramaClient) Controller() (*sarama.Broker, error)

func (*MockSaramaClient) Coordinator

func (m *MockSaramaClient) Coordinator(consumerGroup string) (*sarama.Broker, error)

func (*MockSaramaClient) GetOffset

func (m *MockSaramaClient) GetOffset(topic string, partitionID int32, time int64) (int64, error)

func (*MockSaramaClient) InSyncReplicas

func (m *MockSaramaClient) InSyncReplicas(topic string, partitionID int32) ([]int32, error)

func (*MockSaramaClient) InitProducerID

func (m *MockSaramaClient) InitProducerID() (*sarama.InitProducerIDResponse, error)

func (*MockSaramaClient) Leader

func (m *MockSaramaClient) Leader(topic string, partitionID int32) (*sarama.Broker, error)

func (*MockSaramaClient) OfflineReplicas

func (m *MockSaramaClient) OfflineReplicas(topic string, partitionID int32) ([]int32, error)

func (*MockSaramaClient) Partitions

func (m *MockSaramaClient) Partitions(topic string) ([]int32, error)

func (*MockSaramaClient) RefreshCoordinator

func (m *MockSaramaClient) RefreshCoordinator(consumerGroup string) error

func (*MockSaramaClient) RefreshMetadata

func (m *MockSaramaClient) RefreshMetadata(topics ...string) error

func (*MockSaramaClient) Replicas

func (m *MockSaramaClient) Replicas(topic string, partitionID int32) ([]int32, error)

func (*MockSaramaClient) Topics

func (m *MockSaramaClient) Topics() ([]string, error)

func (*MockSaramaClient) WritablePartitions

func (m *MockSaramaClient) WritablePartitions(topic string) ([]int32, error)

type PartitionMetadata

type PartitionMetadata struct {
	Err             error
	ID              int32
	Leader          int32
	Replicas        []int32
	Isr             []int32
	OfflineReplicas []int32
}

type Partitioner

type Partitioner interface {
	ReassignPartitions(topics []string, brokerList string, batch, timeoutPerBatchInS, pollIntervalInS, throttle int) error
	IncreaseReplication(topicsMetadata []*TopicMetadata, replicationFactor, numOfBrokers, batch, timeoutPerBatchInS, pollIntervalInS, throttle int) error
}

type SSHClient

type SSHClient struct {
	// contains filtered or unexported fields
}

func NewSSHClient

func NewSSHClient(user, port, keyfile string) (*SSHClient, error)

func (*SSHClient) DialAndExecute

func (s *SSHClient) DialAndExecute(address string, commands ...shellCmd) (*bytes.Buffer, error)

type SaramaClient

type SaramaClient struct {
	// contains filtered or unexported fields
}

func NewSaramaClient

func NewSaramaClient(addr []string) *SaramaClient

func (*SaramaClient) CreatePartitions

func (s *SaramaClient) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error

func (*SaramaClient) CreateTopic

func (s *SaramaClient) CreateTopic(topic string, detail TopicDetail, validateOnly bool) error

func (*SaramaClient) DeleteTopic

func (s *SaramaClient) DeleteTopic(topics []string) error

func (*SaramaClient) DescribeTopicMetadata

func (s *SaramaClient) DescribeTopicMetadata(topics []string) ([]*TopicMetadata, error)

func (*SaramaClient) GetConfig

func (s *SaramaClient) GetConfig(resource ConfigResource) ([]ConfigEntry, error)

func (*SaramaClient) GetConsumerGroupsForTopic added in v0.1.0

func (s *SaramaClient) GetConsumerGroupsForTopic(groups []string, topic string) (chan string, error)

func (*SaramaClient) GetTopicResourceType

func (s *SaramaClient) GetTopicResourceType() int

func (*SaramaClient) ListBrokers

func (s *SaramaClient) ListBrokers() map[int]string

func (*SaramaClient) ListConsumerGroups added in v0.1.0

func (s *SaramaClient) ListConsumerGroups() (map[string]string, error)

func (*SaramaClient) ListTopicDetails

func (s *SaramaClient) ListTopicDetails() (map[string]TopicDetail, error)

func (*SaramaClient) UpdateConfig

func (s *SaramaClient) UpdateConfig(resourceType int, name string, entries map[string]*string, validateOnly bool) error

type TopicDetail

type TopicDetail struct {
	NumPartitions     int32
	ReplicationFactor int16
	ReplicaAssignment map[int32][]int32
	Config            map[string]*string
}

type TopicMetadata

type TopicMetadata struct {
	Err        error
	Name       string
	IsInternal bool
	Partitions []*PartitionMetadata
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL