Documentation ¶
Overview ¶
Package helpers - Common utilities. The helpers subsystem provides common utilities that can be used by all subsystems. This includes utilities for coordinators to start and stop modules, as well as Kafka and Zookeeper client implementations. There are also a number of mocks that are provided for testing purposes only, and should not be used in normal code.
Index ¶
- Variables
- func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config
- func StartCoordinatorModules(modules map[string]protocol.Module) error
- func StopCoordinatorModules(modules map[string]protocol.Module)
- func TimeoutSendStorageRequest(storageChannel chan *protocol.StorageRequest, request *protocol.StorageRequest, ...) bool
- func ValidateEmail(email string) bool
- func ValidateFilename(filename string) bool
- func ValidateHostList(hosts []string) bool
- func ValidateHostPort(host string, allowBlankHost bool) bool
- func ValidateHostname(hostname string) bool
- func ValidateIP(ipaddr string) bool
- func ValidateTopic(topic string) bool
- func ValidateURL(rawURL string) bool
- func ValidateZookeeperPath(path string) bool
- func ZookeeperConnect(servers []string, sessionTimeout time.Duration, logger *zap.Logger) (protocol.ZookeeperClient, <-chan zk.Event, error)
- type BurrowSaramaBroker
- type BurrowSaramaClient
- func (c *BurrowSaramaClient) Brokers() []SaramaBroker
- func (c *BurrowSaramaClient) Close() error
- func (c *BurrowSaramaClient) Closed() bool
- func (c *BurrowSaramaClient) Config() *sarama.Config
- func (c *BurrowSaramaClient) Coordinator(consumerGroup string) (SaramaBroker, error)
- func (c *BurrowSaramaClient) GetOffset(topic string, partitionID int32, time int64) (int64, error)
- func (c *BurrowSaramaClient) InSyncReplicas(topic string, partitionID int32) ([]int32, error)
- func (c *BurrowSaramaClient) Leader(topic string, partitionID int32) (SaramaBroker, error)
- func (c *BurrowSaramaClient) NewConsumerFromClient() (sarama.Consumer, error)
- func (c *BurrowSaramaClient) Partitions(topic string) ([]int32, error)
- func (c *BurrowSaramaClient) RefreshCoordinator(consumerGroup string) error
- func (c *BurrowSaramaClient) RefreshMetadata(topics ...string) error
- func (c *BurrowSaramaClient) Replicas(topic string, partitionID int32) ([]int32, error)
- func (c *BurrowSaramaClient) Topics() ([]string, error)
- func (c *BurrowSaramaClient) WritablePartitions(topic string) ([]int32, error)
- type BurrowZookeeperClient
- func (z *BurrowZookeeperClient) ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error)
- func (z *BurrowZookeeperClient) Close()
- func (z *BurrowZookeeperClient) Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
- func (z *BurrowZookeeperClient) ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error)
- func (z *BurrowZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error)
- func (z *BurrowZookeeperClient) NewLock(path string) protocol.ZookeeperLock
- type MockModule
- func (m *MockModule) AcceptConsumerGroup(status *protocol.ConsumerGroupStatus) bool
- func (m *MockModule) Configure(name string, configRoot string)
- func (m *MockModule) GetGroupBlacklist() *regexp.Regexp
- func (m *MockModule) GetGroupWhitelist() *regexp.Regexp
- func (m *MockModule) GetLogger() *zap.Logger
- func (m *MockModule) GetName() string
- func (m *MockModule) Notify(status *protocol.ConsumerGroupStatus, eventID string, startTime time.Time, ...)
- func (m *MockModule) Start() error
- func (m *MockModule) Stop() error
- type MockSaramaBroker
- type MockSaramaClient
- func (m *MockSaramaClient) Brokers() []SaramaBroker
- func (m *MockSaramaClient) Close() error
- func (m *MockSaramaClient) Closed() bool
- func (m *MockSaramaClient) Config() *sarama.Config
- func (m *MockSaramaClient) Coordinator(consumerGroup string) (SaramaBroker, error)
- func (m *MockSaramaClient) GetOffset(topic string, partitionID int32, time int64) (int64, error)
- func (m *MockSaramaClient) InSyncReplicas(topic string, partitionID int32) ([]int32, error)
- func (m *MockSaramaClient) Leader(topic string, partitionID int32) (SaramaBroker, error)
- func (m *MockSaramaClient) NewConsumerFromClient() (sarama.Consumer, error)
- func (m *MockSaramaClient) Partitions(topic string) ([]int32, error)
- func (m *MockSaramaClient) RefreshCoordinator(consumerGroup string) error
- func (m *MockSaramaClient) RefreshMetadata(topics ...string) error
- func (m *MockSaramaClient) Replicas(topic string, partitionID int32) ([]int32, error)
- func (m *MockSaramaClient) Topics() ([]string, error)
- func (m *MockSaramaClient) WritablePartitions(topic string) ([]int32, error)
- type MockSaramaConsumer
- func (m *MockSaramaConsumer) Close() error
- func (m *MockSaramaConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)
- func (m *MockSaramaConsumer) HighWaterMarks() map[string]map[int32]int64
- func (m *MockSaramaConsumer) Partitions(topic string) ([]int32, error)
- func (m *MockSaramaConsumer) Topics() ([]string, error)
- type MockSaramaPartitionConsumer
- func (m *MockSaramaPartitionConsumer) AsyncClose()
- func (m *MockSaramaPartitionConsumer) Close() error
- func (m *MockSaramaPartitionConsumer) Errors() <-chan *sarama.ConsumerError
- func (m *MockSaramaPartitionConsumer) HighWaterMarkOffset() int64
- func (m *MockSaramaPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage
- type MockTicker
- type MockZookeeperClient
- func (m *MockZookeeperClient) ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error)
- func (m *MockZookeeperClient) Close()
- func (m *MockZookeeperClient) Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
- func (m *MockZookeeperClient) ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error)
- func (m *MockZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error)
- func (m *MockZookeeperClient) MockZookeeperConnect(servers []string, sessionTimeout time.Duration, logger *zap.Logger) (protocol.ZookeeperClient, <-chan zk.Event, error)
- func (m *MockZookeeperClient) NewLock(path string) protocol.ZookeeperLock
- type MockZookeeperLock
- type PausableTicker
- type SaramaBroker
- type SaramaClient
- type Ticker
- type XDGSCRAMClient
Constants ¶
This section is empty.
Variables ¶
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
Functions ¶
func GetSaramaConfigFromClientProfile ¶
GetSaramaConfigFromClientProfile takes the name of a client-profile configuration entry and returns a sarama.Config object that can be used to create a Sarama client with the specified configuration. This includes the Kafka version, client ID, TLS, and SASL configs. If there is any error in the configuration, such as a bad TLS certificate file, this func will panic as it is normally called when configuring modules.
func StartCoordinatorModules ¶
StartCoordinatorModules is a helper func for coordinators to start a list of modules. Given a map of protocol.Module, it calls the Start func on each one. If any module returns an error, it immediately stops and returns that error
func StopCoordinatorModules ¶
StopCoordinatorModules is a helper func for coordinators to stop a list of modules. Given a map of protocol.Module, it calls the Stop func on each one. Any errors that are returned are ignored.
func TimeoutSendStorageRequest ¶
func TimeoutSendStorageRequest(storageChannel chan *protocol.StorageRequest, request *protocol.StorageRequest, maxTime int) bool
TimeoutSendStorageRequest is a helper func for sending a protocol.StorageRequest to a channel with a timeout, specified in seconds. If the request is sent, return true. Otherwise, if the timeout is hit, return false.
func ValidateEmail ¶
ValidateEmail returns true if the provided string is an email address. This is a very simplistic validator - the string must be of the form (something)@(something).(something)
func ValidateFilename ¶
ValidateFilename returns true if the provided string is a sane-looking filename (not just a valid filename, which could be almost anything). Right now, this is defined to be the same thing as ValidateTopic.
func ValidateHostList ¶
ValidateHostList returns true if the provided slice of strings can all be parsed by ValidateHostPort
func ValidateHostPort ¶
ValidateHostPort returns true if the provided string is of the form "hostname:port", where hostname is a valid hostname or IP address (as parsed by ValidateIP or ValidateHostname), and port is a valid integer.
func ValidateHostname ¶
ValidateHostname returns true if the provided string can be parsed as a hostname. In general this means:
* One or more segments delimited by a '.' * Each segment can be no more than 63 characters long * Valid characters in a segment are letters, numbers, and dashes * Segments may not start or end with a dash * The exception is IPv6 addresses, which are also permitted.
func ValidateIP ¶
ValidateIP returns true if the provided string can be parsed as an IP address (either IPv4 or IPv6).
func ValidateTopic ¶
ValidateTopic returns true if the provided string is a valid topic name, which may only contain letters, numbers, underscores, dashes, and periods.
func ValidateURL ¶
ValidateURL returns true if the provided string can be parsed as a URL. We use the net/url Parse func for this.
func ValidateZookeeperPath ¶
ValidateZookeeperPath returns true if the provided string can be parsed as a Zookeeper node path. This means that it starts with a forward slash, and contains one or more segments that are separated by slashes (but does not end with a slash).
func ZookeeperConnect ¶
func ZookeeperConnect(servers []string, sessionTimeout time.Duration, logger *zap.Logger) (protocol.ZookeeperClient, <-chan zk.Event, error)
ZookeeperConnect establishes a new connection to a pool of Zookeeper servers. The provided session timeout sets the amount of time for which a session is considered valid after losing connection to a server. Within the session timeout it's possible to reestablish a connection to a different server and keep the same session. This is means any ephemeral nodes and watches are maintained.
Types ¶
type BurrowSaramaBroker ¶
type BurrowSaramaBroker struct {
// contains filtered or unexported fields
}
BurrowSaramaBroker is an implementation of the SaramaBroker interface that is used with SaramaClient
func (*BurrowSaramaBroker) Close ¶
func (b *BurrowSaramaBroker) Close() error
Close closes the connection associated with the broker
func (*BurrowSaramaBroker) GetAvailableOffsets ¶
func (b *BurrowSaramaBroker) GetAvailableOffsets(request *sarama.OffsetRequest) (*sarama.OffsetResponse, error)
GetAvailableOffsets sends an OffsetRequest to the broker and returns the OffsetResponse that was received
func (*BurrowSaramaBroker) ID ¶
func (b *BurrowSaramaBroker) ID() int32
ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
type BurrowSaramaClient ¶
BurrowSaramaClient is an implementation of the SaramaClient interface for use in Burrow modules
func (*BurrowSaramaClient) Brokers ¶
func (c *BurrowSaramaClient) Brokers() []SaramaBroker
Brokers returns the current set of active brokers as retrieved from cluster metadata.
func (*BurrowSaramaClient) Close ¶
func (c *BurrowSaramaClient) Close() error
Close shuts down all broker connections managed by this client. It is required to call this function before a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers using a client before you close the client.
func (*BurrowSaramaClient) Closed ¶
func (c *BurrowSaramaClient) Closed() bool
Closed returns true if the client has already had Close called on it
func (*BurrowSaramaClient) Config ¶
func (c *BurrowSaramaClient) Config() *sarama.Config
Config returns the Config struct of the client. This struct should not be altered after it has been created.
func (*BurrowSaramaClient) Coordinator ¶
func (c *BurrowSaramaClient) Coordinator(consumerGroup string) (SaramaBroker, error)
Coordinator returns the coordinating broker for a consumer group. It will return a locally cached value if it's available. You can call RefreshCoordinator to update the cached value. This function only works on Kafka 0.8.2 and higher.
func (*BurrowSaramaClient) GetOffset ¶
GetOffset queries the cluster to get the most recent available offset at the given time (in milliseconds) on the topic/partition combination. Time should be OffsetOldest for the earliest available offset, OffsetNewest for the offset of the message that will be produced next, or a time.
func (*BurrowSaramaClient) InSyncReplicas ¶
func (c *BurrowSaramaClient) InSyncReplicas(topic string, partitionID int32) ([]int32, error)
InSyncReplicas returns the set of all in-sync replica IDs for the given partition. In-sync replicas are replicas which are fully caught up with the partition leader.
func (*BurrowSaramaClient) Leader ¶
func (c *BurrowSaramaClient) Leader(topic string, partitionID int32) (SaramaBroker, error)
Leader returns the broker object that is the leader of the current topic/partition, as determined by querying the cluster metadata.
func (*BurrowSaramaClient) NewConsumerFromClient ¶
func (c *BurrowSaramaClient) NewConsumerFromClient() (sarama.Consumer, error)
NewConsumerFromClient creates a new consumer using the given client. It is still necessary to call Close() on the underlying client when shutting down this consumer.
func (*BurrowSaramaClient) Partitions ¶
func (c *BurrowSaramaClient) Partitions(topic string) ([]int32, error)
Partitions returns the sorted list of all partition IDs for the given topic.
func (*BurrowSaramaClient) RefreshCoordinator ¶
func (c *BurrowSaramaClient) RefreshCoordinator(consumerGroup string) error
RefreshCoordinator retrieves the coordinator for a consumer group and stores it in local cache. This function only works on Kafka 0.8.2 and higher.
func (*BurrowSaramaClient) RefreshMetadata ¶
func (c *BurrowSaramaClient) RefreshMetadata(topics ...string) error
RefreshMetadata takes a list of topics and queries the cluster to refresh the available metadata for those topics. If no topics are provided, it will refresh metadata for all topics.
func (*BurrowSaramaClient) Replicas ¶
func (c *BurrowSaramaClient) Replicas(topic string, partitionID int32) ([]int32, error)
Replicas returns the set of all replica IDs for the given partition.
func (*BurrowSaramaClient) Topics ¶
func (c *BurrowSaramaClient) Topics() ([]string, error)
Topics returns the set of available topics as retrieved from cluster metadata.
func (*BurrowSaramaClient) WritablePartitions ¶
func (c *BurrowSaramaClient) WritablePartitions(topic string) ([]int32, error)
WritablePartitions returns the sorted list of all writable partition IDs for the given topic, where "writable" means "having a valid leader accepting writes".
type BurrowZookeeperClient ¶
type BurrowZookeeperClient struct {
// contains filtered or unexported fields
}
BurrowZookeeperClient is an implementation of protocol.ZookeeperClient
func (*BurrowZookeeperClient) ChildrenW ¶
ChildrenW returns a slice of names of child ZNodes immediately underneath the specified parent path. It also returns a zk.Stat describing the parent path, and a channel over which a zk.Event object will be sent if the child list changes (a child is added or deleted).
func (*BurrowZookeeperClient) Close ¶
func (z *BurrowZookeeperClient) Close()
Close shuts down the connection to the Zookeeper ensemble.
func (*BurrowZookeeperClient) Create ¶
func (z *BurrowZookeeperClient) Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
Create makes a new ZNode at the specified path with the contents set to the data byte-slice. Flags can be provided to specify that this is an ephemeral or sequence node, and an ACL must be provided. If no ACL is desired, specify
zk.WorldACL(zk.PermAll)
func (*BurrowZookeeperClient) ExistsW ¶
ExistsW returns a boolean stating whether or not the specified path exists. This method also sets a watch on the node (exists if it does not currently exist, or a data watch otherwise), providing an event channel that will receive a message when the watch fires
func (*BurrowZookeeperClient) GetW ¶
GetW returns the data in the specified ZNode as a slice of bytes. It also returns a zk.Stat describing the ZNode, and a channel over which a zk.Event object will be sent if the ZNode changes (data changed, or ZNode deleted).
func (*BurrowZookeeperClient) NewLock ¶
func (z *BurrowZookeeperClient) NewLock(path string) protocol.ZookeeperLock
NewLock creates a lock using the provided path. Multiple Zookeeper clients, using the same lock path, can synchronize with each other to assure that only one client has the lock at any point.
type MockModule ¶
MockModule is a mock of protocol.Module that also satisfies the various subsystem Module variants, and is used in tests. It should never be used in the normal code.
func (*MockModule) AcceptConsumerGroup ¶
func (m *MockModule) AcceptConsumerGroup(status *protocol.ConsumerGroupStatus) bool
AcceptConsumerGroup mocks the notifier.Module AcceptConsumerGroup func
func (*MockModule) Configure ¶
func (m *MockModule) Configure(name string, configRoot string)
Configure mocks the protocol.Module Configure func
func (*MockModule) GetGroupBlacklist ¶
func (m *MockModule) GetGroupBlacklist() *regexp.Regexp
GetGroupBlacklist mocks the notifier.Module GetGroupBlacklist func
func (*MockModule) GetGroupWhitelist ¶
func (m *MockModule) GetGroupWhitelist() *regexp.Regexp
GetGroupWhitelist mocks the notifier.Module GetGroupWhitelist func
func (*MockModule) GetLogger ¶
func (m *MockModule) GetLogger() *zap.Logger
GetLogger mocks the notifier.Module GetLogger func
func (*MockModule) GetName ¶
func (m *MockModule) GetName() string
GetName mocks the notifier.Module GetName func
func (*MockModule) Notify ¶
func (m *MockModule) Notify(status *protocol.ConsumerGroupStatus, eventID string, startTime time.Time, stateGood bool)
Notify mocks the notifier.Module Notify func
func (*MockModule) Start ¶
func (m *MockModule) Start() error
Start mocks the protocol.Module Start func
func (*MockModule) Stop ¶
func (m *MockModule) Stop() error
Stop mocks the protocol.Module Stop func
type MockSaramaBroker ¶
MockSaramaBroker is a mock of SaramaBroker. It is used in tests by multiple packages. It should never be used in the normal code.
func (*MockSaramaBroker) Close ¶
func (m *MockSaramaBroker) Close() error
Close mocks SaramaBroker.Close
func (*MockSaramaBroker) GetAvailableOffsets ¶
func (m *MockSaramaBroker) GetAvailableOffsets(request *sarama.OffsetRequest) (*sarama.OffsetResponse, error)
GetAvailableOffsets mocks SaramaBroker.GetAvailableOffsets
type MockSaramaClient ¶
MockSaramaClient is a mock of SaramaClient. It is used in tests by multiple packages. It should never be used in the normal code.
func (*MockSaramaClient) Brokers ¶
func (m *MockSaramaClient) Brokers() []SaramaBroker
Brokers mocks SaramaClient.Brokers
func (*MockSaramaClient) Close ¶
func (m *MockSaramaClient) Close() error
Close mocks SaramaClient.Close
func (*MockSaramaClient) Closed ¶
func (m *MockSaramaClient) Closed() bool
Closed mocks SaramaClient.Closed
func (*MockSaramaClient) Config ¶
func (m *MockSaramaClient) Config() *sarama.Config
Config mocks SaramaClient.Config
func (*MockSaramaClient) Coordinator ¶
func (m *MockSaramaClient) Coordinator(consumerGroup string) (SaramaBroker, error)
Coordinator mocks SaramaClient.Coordinator
func (*MockSaramaClient) InSyncReplicas ¶
func (m *MockSaramaClient) InSyncReplicas(topic string, partitionID int32) ([]int32, error)
InSyncReplicas mocks SaramaClient.InSyncReplicas
func (*MockSaramaClient) Leader ¶
func (m *MockSaramaClient) Leader(topic string, partitionID int32) (SaramaBroker, error)
Leader mocks SaramaClient.Leader
func (*MockSaramaClient) NewConsumerFromClient ¶
func (m *MockSaramaClient) NewConsumerFromClient() (sarama.Consumer, error)
NewConsumerFromClient mocks SaramaClient.NewConsumerFromClient
func (*MockSaramaClient) Partitions ¶
func (m *MockSaramaClient) Partitions(topic string) ([]int32, error)
Partitions mocks SaramaClient.Partitions
func (*MockSaramaClient) RefreshCoordinator ¶
func (m *MockSaramaClient) RefreshCoordinator(consumerGroup string) error
RefreshCoordinator mocks SaramaClient.RefreshCoordinator
func (*MockSaramaClient) RefreshMetadata ¶
func (m *MockSaramaClient) RefreshMetadata(topics ...string) error
RefreshMetadata mocks SaramaClient.RefreshMetadata
func (*MockSaramaClient) Replicas ¶
func (m *MockSaramaClient) Replicas(topic string, partitionID int32) ([]int32, error)
Replicas mocks SaramaClient.Replicas
func (*MockSaramaClient) Topics ¶
func (m *MockSaramaClient) Topics() ([]string, error)
Topics mocks SaramaClient.Topics
func (*MockSaramaClient) WritablePartitions ¶
func (m *MockSaramaClient) WritablePartitions(topic string) ([]int32, error)
WritablePartitions mocks SaramaClient.WritablePartitions
type MockSaramaConsumer ¶
MockSaramaConsumer is a mock of sarama.Consumer. It is used in tests by multiple packages. It should never be used in the normal code.
func (*MockSaramaConsumer) Close ¶
func (m *MockSaramaConsumer) Close() error
Close mocks sarama.Consumer.Close
func (*MockSaramaConsumer) ConsumePartition ¶
func (m *MockSaramaConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)
ConsumePartition mocks sarama.Consumer.ConsumePartition
func (*MockSaramaConsumer) HighWaterMarks ¶
func (m *MockSaramaConsumer) HighWaterMarks() map[string]map[int32]int64
HighWaterMarks mocks sarama.Consumer.HighWaterMarks
func (*MockSaramaConsumer) Partitions ¶
func (m *MockSaramaConsumer) Partitions(topic string) ([]int32, error)
Partitions mocks sarama.Consumer.Partitions
func (*MockSaramaConsumer) Topics ¶
func (m *MockSaramaConsumer) Topics() ([]string, error)
Topics mocks sarama.Consumer.Topics
type MockSaramaPartitionConsumer ¶
MockSaramaPartitionConsumer is a mock of sarama.PartitionConsumer. It is used in tests by multiple packages. It should never be used in the normal code.
func (*MockSaramaPartitionConsumer) AsyncClose ¶
func (m *MockSaramaPartitionConsumer) AsyncClose()
AsyncClose mocks sarama.PartitionConsumer.AsyncClose
func (*MockSaramaPartitionConsumer) Close ¶
func (m *MockSaramaPartitionConsumer) Close() error
Close mocks sarama.PartitionConsumer.Close
func (*MockSaramaPartitionConsumer) Errors ¶
func (m *MockSaramaPartitionConsumer) Errors() <-chan *sarama.ConsumerError
Errors mocks sarama.PartitionConsumer.Errors
func (*MockSaramaPartitionConsumer) HighWaterMarkOffset ¶
func (m *MockSaramaPartitionConsumer) HighWaterMarkOffset() int64
HighWaterMarkOffset mocks sarama.PartitionConsumer.HighWaterMarkOffset
func (*MockSaramaPartitionConsumer) Messages ¶
func (m *MockSaramaPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage
Messages mocks sarama.PartitionConsumer.Messages
type MockTicker ¶
MockTicker is a mock Ticker interface that can be used for testing. It should not be used in normal code.
func (*MockTicker) GetChannel ¶
func (m *MockTicker) GetChannel() <-chan time.Time
GetChannel mocks Ticker.GetChannel
type MockZookeeperClient ¶
type MockZookeeperClient struct { mock.Mock // InitialError can be set before using the MockZookeeperConnect call to specify an error that should be returned // from that call. InitialError error // EventChannel can be set before using the MockZookeeperConnect call to provide the channel that that call returns. EventChannel chan zk.Event // Servers stores the slice of strings that is provided to MockZookeeperConnect Servers []string // SessionTimeout stores the value that is provided to MockZookeeperConnect SessionTimeout time.Duration }
MockZookeeperClient is a mock of the protocol.ZookeeperClient interface to be used for testing. It should not be used in normal code.
func (*MockZookeeperClient) Close ¶
func (m *MockZookeeperClient) Close()
Close mocks protocol.ZookeeperClient.Close
func (*MockZookeeperClient) Create ¶
func (m *MockZookeeperClient) Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
Create mocks protocol.ZookeeperClient.Create
func (*MockZookeeperClient) MockZookeeperConnect ¶
func (m *MockZookeeperClient) MockZookeeperConnect(servers []string, sessionTimeout time.Duration, logger *zap.Logger) (protocol.ZookeeperClient, <-chan zk.Event, error)
MockZookeeperConnect is a func that mocks the ZookeeperConnect call, but allows us to pre-populate the return values and save the arguments provided for assertions.
func (*MockZookeeperClient) NewLock ¶
func (m *MockZookeeperClient) NewLock(path string) protocol.ZookeeperLock
NewLock mocks protocol.ZookeeperClient.NewLock
type MockZookeeperLock ¶
MockZookeeperLock is a mock of the protocol.ZookeeperLock interface. It should not be used in normal code.
func (*MockZookeeperLock) Lock ¶
func (m *MockZookeeperLock) Lock() error
Lock mocks protocol.ZookeeperLock.Lock
func (*MockZookeeperLock) Unlock ¶
func (m *MockZookeeperLock) Unlock() error
Unlock mocks protocol.ZookeeperLock.Unlock
type PausableTicker ¶
type PausableTicker struct {
// contains filtered or unexported fields
}
PausableTicker is an implementation of Ticker which can be stopped and restarted without changing the underlying channel. This is useful for cases where you may need to stop performing actions for a while (such as sending notifications), but you do not want to tear down everything.
func (*PausableTicker) GetChannel ¶
func (ticker *PausableTicker) GetChannel() <-chan time.Time
GetChannel returns the channel over which ticks will be sent. This channel can be used over multiple Start/Stop cycles, and will not be closed.
func (*PausableTicker) Start ¶
func (ticker *PausableTicker) Start()
Start begins sending ticks over the channel at the interval that has already been configured. If the ticker is already sending ticks, this func has no effect.
func (*PausableTicker) Stop ¶
func (ticker *PausableTicker) Stop()
Stop stops ticks from being sent over the channel. If the ticker is not currently sending ticks, this func has no effect
type SaramaBroker ¶
type SaramaBroker interface { // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known. ID() int32 // Close closes the connection associated with the broker Close() error // GetAvailableOffsets sends an OffsetRequest to the broker and returns the OffsetResponse that was received GetAvailableOffsets(*sarama.OffsetRequest) (*sarama.OffsetResponse, error) }
SaramaBroker is an internal interface on the sarama.Broker struct. It is used with the SaramaClient interface in order to provide a fully testable interface for the pieces of Sarama that are used inside Burrow. Currently, this interface only defines the methods that Burrow is using. It should not be considered a complete interface for sarama.Broker
type SaramaClient ¶
type SaramaClient interface { // Config returns the Config struct of the client. This struct should not be altered after it has been created. Config() *sarama.Config // Brokers returns the current set of active brokers as retrieved from cluster metadata. Brokers() []SaramaBroker // Topics returns the set of available topics as retrieved from cluster metadata. Topics() ([]string, error) // Partitions returns the sorted list of all partition IDs for the given topic. Partitions(topic string) ([]int32, error) // WritablePartitions returns the sorted list of all writable partition IDs for the given topic, where "writable" // means "having a valid leader accepting writes". WritablePartitions(topic string) ([]int32, error) // Leader returns the broker object that is the leader of the current topic/partition, as determined by querying the // cluster metadata. Leader(topic string, partitionID int32) (SaramaBroker, error) // Replicas returns the set of all replica IDs for the given partition. Replicas(topic string, partitionID int32) ([]int32, error) // InSyncReplicas returns the set of all in-sync replica IDs for the given partition. In-sync replicas are replicas // which are fully caught up with the partition leader. InSyncReplicas(topic string, partitionID int32) ([]int32, error) // RefreshMetadata takes a list of topics and queries the cluster to refresh the available metadata for those topics. // If no topics are provided, it will refresh metadata for all topics. RefreshMetadata(topics ...string) error // GetOffset queries the cluster to get the most recent available offset at the given time (in milliseconds) on the // topic/partition combination. Time should be OffsetOldest for the earliest available offset, OffsetNewest for the // offset of the message that will be produced next, or a time. GetOffset(topic string, partitionID int32, time int64) (int64, error) // Coordinator returns the coordinating broker for a consumer group. It will return a locally cached value if it's // available. You can call RefreshCoordinator to update the cached value. This function only works on Kafka 0.8.2 and // higher. Coordinator(consumerGroup string) (SaramaBroker, error) // RefreshCoordinator retrieves the coordinator for a consumer group and stores it in local cache. This function only // works on Kafka 0.8.2 and higher. RefreshCoordinator(consumerGroup string) error // Close shuts down all broker connections managed by this client. It is required to call this function before a client // object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers using a // client before you close the client. Close() error // Closed returns true if the client has already had Close called on it Closed() bool // NewConsumerFromClient creates a new consumer using the given client. It is still necessary to call Close() on the // underlying client when shutting down this consumer. NewConsumerFromClient() (sarama.Consumer, error) }
SaramaClient is an internal interface to the sarama.Client. We use our own interface because while sarama.Client is an interface, sarama.Broker is not. This makes it difficult to test code which uses the Broker objects. This interface operates in the same way, with the addition of an interface function for creating consumers on the client.
type Ticker ¶
type Ticker interface { // Start sending ticks over the channel Start() // Stop sending ticks over the channel Stop() // Return the channel that ticks will be sent over GetChannel() <-chan time.Time }
Ticker is a generic interface for a channel that delivers `ticks' of a clock at intervals.
func NewPausableTicker ¶
NewPausableTicker returns a Ticker that has not yet been started, but the channel is ready to use. This ticker can be started and stopped multiple times without needing to swap the ticker channel
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool