Documentation ¶
Index ¶
- type Cluster
- func (c *Cluster) AddNode(nodeID int32, port int) (int32, int, error)
- func (c *Cluster) Close()
- func (c *Cluster) Control(fn func(kmsg.Request) (kmsg.Response, error, bool))
- func (c *Cluster) ControlKey(key int16, fn func(kmsg.Request) (kmsg.Response, error, bool))
- func (c *Cluster) KeepControl()
- func (c *Cluster) ListenAddrs() []string
- func (c *Cluster) MoveTopicPartition(topic string, partition int32, nodeID int32) error
- func (c *Cluster) RemoveNode(nodeID int32) error
- func (c *Cluster) ShufflePartitionLeaders()
- type LogLevel
- type Logger
- type Opt
- func AllowAutoTopicCreation() Opt
- func ClusterID(clusterID string) Opt
- func DefaultNumPartitions(n int) Opt
- func EnableSASL() Opt
- func GroupMaxSessionTimeout(d time.Duration) Opt
- func GroupMinSessionTimeout(d time.Duration) Opt
- func NumBrokers(n int) Opt
- func Ports(ports ...int) Opt
- func Superuser(method, user, pass string) Opt
- func WithLogger(logger Logger) Opt
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
Cluster is a mock Kafka broker cluster.
func MustCluster ¶
MustCluster is like NewCluster, but panics on error.
func NewCluster ¶
NewCluster returns a new mocked Kafka cluster.
func (*Cluster) AddNode ¶
AddNode adds a node to the cluster. If nodeID is -1, the next node ID is used. If port is 0 or negative, a random port is chosen. This returns the added node ID and the port used, or an error if the node already exists or the port cannot be listened to.
func (*Cluster) Control ¶
Control is a function to call on any client request the cluster handles.
If the control function returns true, then either the response is written back to the client or, if there the control function returns an error, the client connection is closed. If both returns are nil, then the cluster will loop continuing to read from the client and the client will likely have a read timeout at some point.
Controlling a request drops the control function from the cluster, meaning that a control function can only control *one* request. To keep the control function handling more requests, you can call KeepControl within your control function.
It is safe to add new control functions within a control function. Control functions are not called concurrently.
func (*Cluster) ControlKey ¶
Control is a function to call on a specific request key that the cluster handles.
If the control function returns true, then either the response is written back to the client or, if there the control function returns an error, the client connection is closed. If both returns are nil, then the cluster will loop continuing to read from the client and the client will likely have a read timeout at some point.
Controlling a request drops the control function from the cluster, meaning that a control function can only control *one* request. To keep the control function handling more requests, you can call KeepControl within your control function.
It is safe to add new control functions within a control function.
func (*Cluster) KeepControl ¶
func (c *Cluster) KeepControl()
KeepControl marks the currently running control function to be kept even if you handle the request and return true. This can be used to continuously control requests without needing to re-add control functions manually.
func (*Cluster) ListenAddrs ¶
ListenAddrs returns the hostports that the cluster is listening on.
func (*Cluster) MoveTopicPartition ¶
MoveTopicPartition simulates the rebalancing of a partition to an alternative broker. This returns an error if the topic, partition, or node does not exit.
func (*Cluster) RemoveNode ¶
RemoveNode removes a ndoe from the cluster. This returns an error if the node does not exist.
func (*Cluster) ShufflePartitionLeaders ¶
func (c *Cluster) ShufflePartitionLeaders()
ShufflePartitionLeaders simulates a leader election for all partitions: all partitions have a randomly selected new leader and their internal epochs are bumped.
type LogLevel ¶
type LogLevel int8
LogLevel designates which level the logger should log at.
const ( // LogLevelNone disables logging. LogLevelNone LogLevel = iota // LogLevelError logs all errors. Generally, these should not happen. LogLevelError // LogLevelWarn logs all warnings, such as request failures. LogLevelWarn // LogLevelInfo logs informational messages, such as requests. This is // usually the default log level. LogLevelInfo // LogLevelDebug logs verbose information, and is usually not used in // production. LogLevelDebug )
type Opt ¶
type Opt interface {
// contains filtered or unexported methods
}
Opt is an option to configure a client.
func AllowAutoTopicCreation ¶
func AllowAutoTopicCreation() Opt
AllowAutoTopicCreation allows metadata requests to create topics if the metadata request has its AllowAutoTopicCreation field set to true.
func DefaultNumPartitions ¶
DefaultNumPartitions sets the number of partitions to create by default for auto created topics / CreateTopics with -1 partitions.
func EnableSASL ¶
func EnableSASL() Opt
EnableSASL enables SASL authentication for the cluster. If you do not configure a bootstrap user / pass, the default superuser is "admin" / "admin" with the SCRAM-SHA-256 SASL mechanisms.
func GroupMaxSessionTimeout ¶
GroupMaxSessionTimeout sets the cluster's maximum session timeout allowed for groups, overriding the default 5 minutes.
func GroupMinSessionTimeout ¶
GroupMinSessionTimeout sets the cluster's minimum session timeout allowed for groups, overriding the default 6 seconds.
func NumBrokers ¶
NumBrokers sets the number of brokers to start in the fake cluster.
func Ports ¶
Ports sets the ports to listen on, overriding randomly choosing NumBrokers amount of ports.
func Superuser ¶
Superuser seeds the cluster with a superuser. The method must be either PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512. Note that PLAIN superusers cannot be deleted. SCRAM superusers can be modified with AlterUserScramCredentials. If you delete all SASL users, the kfake cluster will be unusable.
Source Files ¶
- 00_produce.go
- 01_fetch.go
- 02_list_offsets.go
- 03_metadata.go
- 08_offset_commit.go
- 09_offset_fetch.go
- 10_find_coordinator.go
- 11_join_group.go
- 12_heartbeat.go
- 13_leave_group.go
- 14_sync_group.go
- 15_describe_groups.go
- 16_list_groups.go
- 17_sasl_handshake.go
- 18_api_versions.go
- 19_create_topics.go
- 20_delete_topics.go
- 22_init_producer_id.go
- 23_offset_for_leader_epoch.go
- 36_sasl_authenticate.go
- 37_create_partitions.go
- 42_delete_groups.go
- 50_describe_user_scram_credentials.go
- 51_alter_user_scram_credentials.go
- client_conn.go
- cluster.go
- config.go
- data.go
- groups.go
- logger.go
- misc.go
- pid.go
- sasl.go
- topic_partition.go