Versions in this module Expand all Collapse all v1 v1.0.0 Dec 3, 2017 Changes in this version + func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config + func StartCoordinatorModules(modules map[string]protocol.Module) error + func StopCoordinatorModules(modules map[string]protocol.Module) + func TimeoutSendStorageRequest(storageChannel chan *protocol.StorageRequest, request *protocol.StorageRequest, ...) bool + func ValidateEmail(email string) bool + func ValidateFilename(filename string) bool + func ValidateHostList(hosts []string) bool + func ValidateHostPort(host string, allowBlankHost bool) bool + func ValidateHostname(hostname string) bool + func ValidateIP(ipaddr string) bool + func ValidateTopic(topic string) bool + func ValidateURL(rawURL string) bool + func ValidateZookeeperPath(path string) bool + func ZookeeperConnect(servers []string, sessionTimeout time.Duration, logger *zap.Logger) (protocol.ZookeeperClient, <-chan zk.Event, error) + type BurrowSaramaBroker struct + func (b *BurrowSaramaBroker) Close() error + func (b *BurrowSaramaBroker) GetAvailableOffsets(request *sarama.OffsetRequest) (*sarama.OffsetResponse, error) + func (b *BurrowSaramaBroker) ID() int32 + type BurrowSaramaClient struct + Client sarama.Client + func (c *BurrowSaramaClient) Brokers() []SaramaBroker + func (c *BurrowSaramaClient) Close() error + func (c *BurrowSaramaClient) Closed() bool + func (c *BurrowSaramaClient) Config() *sarama.Config + func (c *BurrowSaramaClient) Coordinator(consumerGroup string) (SaramaBroker, error) + func (c *BurrowSaramaClient) GetOffset(topic string, partitionID int32, time int64) (int64, error) + func (c *BurrowSaramaClient) InSyncReplicas(topic string, partitionID int32) ([]int32, error) + func (c *BurrowSaramaClient) Leader(topic string, partitionID int32) (SaramaBroker, error) + func (c *BurrowSaramaClient) NewConsumerFromClient() (sarama.Consumer, error) + func (c *BurrowSaramaClient) Partitions(topic string) ([]int32, error) + func (c *BurrowSaramaClient) RefreshCoordinator(consumerGroup string) error + func (c *BurrowSaramaClient) RefreshMetadata(topics ...string) error + func (c *BurrowSaramaClient) Replicas(topic string, partitionID int32) ([]int32, error) + func (c *BurrowSaramaClient) Topics() ([]string, error) + func (c *BurrowSaramaClient) WritablePartitions(topic string) ([]int32, error) + type BurrowZookeeperClient struct + func (z *BurrowZookeeperClient) ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error) + func (z *BurrowZookeeperClient) Close() + func (z *BurrowZookeeperClient) Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error) + func (z *BurrowZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error) + func (z *BurrowZookeeperClient) NewLock(path string) protocol.ZookeeperLock + type MockModule struct + func (m *MockModule) AcceptConsumerGroup(status *protocol.ConsumerGroupStatus) bool + func (m *MockModule) Configure(name string, configRoot string) + func (m *MockModule) GetGroupBlacklist() *regexp.Regexp + func (m *MockModule) GetGroupWhitelist() *regexp.Regexp + func (m *MockModule) GetLogger() *zap.Logger + func (m *MockModule) GetName() string + func (m *MockModule) Notify(status *protocol.ConsumerGroupStatus, eventID string, startTime time.Time, ...) + func (m *MockModule) Start() error + func (m *MockModule) Stop() error + type MockSaramaBroker struct + func (m *MockSaramaBroker) Close() error + func (m *MockSaramaBroker) GetAvailableOffsets(request *sarama.OffsetRequest) (*sarama.OffsetResponse, error) + func (m *MockSaramaBroker) ID() int32 + type MockSaramaClient struct + func (m *MockSaramaClient) Brokers() []SaramaBroker + func (m *MockSaramaClient) Close() error + func (m *MockSaramaClient) Closed() bool + func (m *MockSaramaClient) Config() *sarama.Config + func (m *MockSaramaClient) Coordinator(consumerGroup string) (SaramaBroker, error) + func (m *MockSaramaClient) GetOffset(topic string, partitionID int32, time int64) (int64, error) + func (m *MockSaramaClient) InSyncReplicas(topic string, partitionID int32) ([]int32, error) + func (m *MockSaramaClient) Leader(topic string, partitionID int32) (SaramaBroker, error) + func (m *MockSaramaClient) NewConsumerFromClient() (sarama.Consumer, error) + func (m *MockSaramaClient) Partitions(topic string) ([]int32, error) + func (m *MockSaramaClient) RefreshCoordinator(consumerGroup string) error + func (m *MockSaramaClient) RefreshMetadata(topics ...string) error + func (m *MockSaramaClient) Replicas(topic string, partitionID int32) ([]int32, error) + func (m *MockSaramaClient) Topics() ([]string, error) + func (m *MockSaramaClient) WritablePartitions(topic string) ([]int32, error) + type MockSaramaConsumer struct + func (m *MockSaramaConsumer) Close() error + func (m *MockSaramaConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) + func (m *MockSaramaConsumer) HighWaterMarks() map[string]map[int32]int64 + func (m *MockSaramaConsumer) Partitions(topic string) ([]int32, error) + func (m *MockSaramaConsumer) Topics() ([]string, error) + type MockSaramaPartitionConsumer struct + func (m *MockSaramaPartitionConsumer) AsyncClose() + func (m *MockSaramaPartitionConsumer) Close() error + func (m *MockSaramaPartitionConsumer) Errors() <-chan *sarama.ConsumerError + func (m *MockSaramaPartitionConsumer) HighWaterMarkOffset() int64 + func (m *MockSaramaPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage + type MockTicker struct + func (m *MockTicker) GetChannel() <-chan time.Time + func (m *MockTicker) Start() + func (m *MockTicker) Stop() + type MockZookeeperClient struct + EventChannel chan zk.Event + InitialError error + Servers []string + SessionTimeout time.Duration + func (m *MockZookeeperClient) ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error) + func (m *MockZookeeperClient) Close() + func (m *MockZookeeperClient) Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error) + func (m *MockZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error) + func (m *MockZookeeperClient) MockZookeeperConnect(servers []string, sessionTimeout time.Duration, logger *zap.Logger) (protocol.ZookeeperClient, <-chan zk.Event, error) + func (m *MockZookeeperClient) NewLock(path string) protocol.ZookeeperLock + type MockZookeeperLock struct + func (m *MockZookeeperLock) Lock() error + func (m *MockZookeeperLock) Unlock() error + type PausableTicker struct + func (ticker *PausableTicker) GetChannel() <-chan time.Time + func (ticker *PausableTicker) Start() + func (ticker *PausableTicker) Stop() + type SaramaBroker interface + Close func() error + GetAvailableOffsets func(*sarama.OffsetRequest) (*sarama.OffsetResponse, error) + ID func() int32 + type SaramaClient interface + Brokers func() []SaramaBroker + Close func() error + Closed func() bool + Config func() *sarama.Config + Coordinator func(consumerGroup string) (SaramaBroker, error) + GetOffset func(topic string, partitionID int32, time int64) (int64, error) + InSyncReplicas func(topic string, partitionID int32) ([]int32, error) + Leader func(topic string, partitionID int32) (SaramaBroker, error) + NewConsumerFromClient func() (sarama.Consumer, error) + Partitions func(topic string) ([]int32, error) + RefreshCoordinator func(consumerGroup string) error + RefreshMetadata func(topics ...string) error + Replicas func(topic string, partitionID int32) ([]int32, error) + Topics func() ([]string, error) + WritablePartitions func(topic string) ([]int32, error) + type Ticker interface + GetChannel func() <-chan time.Time + Start func() + Stop func() + func NewPausableTicker(d time.Duration) Ticker