kafkatest

package
v0.0.0-...-339d904 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2018 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package kafkatest provides mock objects for high level kafka interface.

Use NewBroker function to create mock broker object and standard methods to create producers and consumers.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrTimeout = errors.New("timeout")

	ErrNotImplemented = errors.New("not implemented")
)

Functions

func SetLogger

func SetLogger(l *logging.Logger)

Types

type Broker

type Broker struct {

	// OffsetEarliestHandler is callback function called whenever
	// OffsetEarliest method of the broker is called. Overwrite to change
	// default behaviour -- always returning ErrUnknownTopicOrPartition
	OffsetEarliestHandler func(string, int32) (int64, error)

	// OffsetLatestHandler is callback function called whenever OffsetLatest
	// method of the broker is called. Overwrite to change default behaviour --
	// always returning ErrUnknownTopicOrPartition
	OffsetLatestHandler func(string, int32) (int64, error)
	// contains filtered or unexported fields
}

Broker is mock version of kafka's broker. It's implementing Broker interface and provides easy way of mocking server actions.

func NewBroker

func NewBroker() *Broker

func (*Broker) Close

func (b *Broker) Close()

Close is no operation method, required by Broker interface.

func (*Broker) Consumer

func (b *Broker) Consumer(conf kafka.ConsumerConf) (kafka.Consumer, error)

Consumer returns consumer mock and never error.

At most one consumer for every topic-partition pair can be created -- calling this for the same topic-partition will always return the same consumer instance.

Example
broker := NewBroker()
msg := &proto.Message{Value: []byte("first")}

// mock server actions, pushing data through consumer
go func() {
	consumer, _ := broker.Consumer(kafka.NewConsumerConf("my-topic", 0))
	c := consumer.(*Consumer)
	// it is possible to send messages through consumer...
	c.Messages <- msg

	// every consumer fetch call is blocking untill there is either message
	// or error ready to return, this way we can test slow consumers
	time.Sleep(time.Millisecond * 20)

	// ...as well as push errors to mock failure
	c.Errors <- errors.New("expected error is expected")
}()

// test broker never fails creating consumer
consumer, _ := broker.Consumer(kafka.NewConsumerConf("my-topic", 0))

m, err := consumer.Consume()
if err == nil {
	fmt.Printf("Value: %q\n", m.Value)
}
if _, err = consumer.Consume(); err != nil {
	fmt.Printf("Error: %s\n", err)
}
Output:


Value: "first"
Error: expected error is expected

func (*Broker) OffsetCoordinator

func (b *Broker) OffsetCoordinator(conf kafka.OffsetCoordinatorConf) (kafka.OffsetCoordinator, error)

OffsetCoordinator returns offset coordinator mock instance. It's always successful, so you can always ignore returned error.

func (*Broker) OffsetEarliest

func (b *Broker) OffsetEarliest(topic string, partition int32) (int64, error)

OffsetEarliest return result of OffsetEarliestHandler callback set on the broker. If not set, always return ErrUnknownTopicOrPartition

func (*Broker) OffsetLatest

func (b *Broker) OffsetLatest(topic string, partition int32) (int64, error)

OffsetLatest return result of OffsetLatestHandler callback set on the broker. If not set, always return ErrUnknownTopicOrPartition

func (*Broker) Producer

func (b *Broker) Producer(kafka.ProducerConf) kafka.Producer

Producer returns producer mock instance.

Example
broker := NewBroker()
msg := &proto.Message{Value: []byte("first")}

producer := broker.Producer(kafka.NewProducerConf())

// mock server actions, handling any produce call
go func() {
	resp, err := broker.ReadProducers(time.Millisecond * 20)
	if err != nil {
		panic(fmt.Sprintf("failed reading producers: %s", err))
	}
	if len(resp.Messages) != 1 {
		panic("expected single message")
	}
	if !reflect.DeepEqual(resp.Messages[0], msg) {
		panic("expected different message")
	}
}()

// provide data for above goroutine
_, err := producer.Produce("my-topic", 0, msg)
if err != nil {
	panic(fmt.Sprintf("cannot produce message: %s", err))
}

mockProducer := producer.(*Producer)

// test error handling by forcing producer to return error,
//
// it is possible to manipulate produce result by changing producer's
// ResponseOffset and ResponseError attributes
mockProducer.ResponseError = errors.New("my spoon is too big!")
_, err = producer.Produce("my-topic", 0, msg)
fmt.Printf("Error: %s\n", err)
Output:


Error: my spoon is too big!

func (*Broker) ReadProducers

func (b *Broker) ReadProducers(timeout time.Duration) (*ProducedMessages, error)

ReadProducers return ProduceMessages representing produce call of one of created by broker producers or ErrTimeout.

type Consumer

type Consumer struct {
	Broker *Broker

	// Messages is channel consumed by fetch method call. Pushing message into
	// this channel will result in Consume method call returning message data.
	Messages chan *proto.Message

	// Errors is channel consumed by fetch method call. Pushing error into this
	// channel will result in Consume method call returning error.
	Errors chan error
	// contains filtered or unexported fields
}

Consumer mocks kafka's consumer. Use Messages and Errors channels to mock Consume method results.

func (*Consumer) Consume

func (c *Consumer) Consume() (*proto.Message, error)

Consume returns message or error pushed through consumers Messages and Errors channel. Function call will block until data on at least one of those channels is available.

func (*Consumer) SeekToLatest

func (c *Consumer) SeekToLatest() error

SeekToLatest discards all messages currently enqueued, unless an error is available first.

type Middleware

type Middleware func(nodeID int32, requestKind int16, content []byte) Response

Middleware is function that is called for every incomming kafka message, before running default processing handler. Middleware function can return nil or kafka response message.

type OffsetCoordinator

type OffsetCoordinator struct {
	Broker *Broker

	// Offsets is used to store all offset commits when using mocked
	// coordinator's default behaviour.
	Offsets map[string]int64

	// CommitHandler is callback function called whenever Commit method of the
	// OffsetCoordinator is called. If CommitHandler is nil, Commit method will
	// return data using Offset attribute as store.
	CommitHandler func(consumerGroup string, topic string, partition int32, offset int64) error

	// OffsetHandler is callback function called whenever Offset method of the
	// OffsetCoordinator is called. If OffsetHandler is nil, Commit method will
	// use Offset attribute to retrieve the offset.
	OffsetHandler func(consumerGroup string, topic string, partition int32) (offset int64, metadata string, err error)
	// contains filtered or unexported fields
}

func (*OffsetCoordinator) Close

func (c *OffsetCoordinator) Close()

func (*OffsetCoordinator) Commit

func (c *OffsetCoordinator) Commit(topic string, partition int32, offset int64) error

Commit return result of CommitHandler callback set on coordinator. If handler is nil, this method will use Offsets attribute to store data for further use.

func (*OffsetCoordinator) Offset

func (c *OffsetCoordinator) Offset(topic string, partition int32) (offset int64, metadata string, err error)

Offset return result of OffsetHandler callback set on coordinator. If handler is nil, this method will use Offsets attribute to retrieve committed offset. If no offset for given topic and partition pair was saved, proto.ErrUnknownTopicOrPartition is returned.

type ProducedMessages

type ProducedMessages struct {
	Topic     string
	Partition int32
	Messages  []*proto.Message
}

ProducedMessages represents all arguments used for single Produce method call.

type Producer

type Producer struct {
	Broker *Broker

	// ResponseOffset is offset counter returned and incremented by every
	// Produce method call. By default set to 1.
	ResponseOffset int64

	// ResponseError if set, force Produce method call to instantly return
	// error, without publishing messages. By default nil.
	ResponseError error
}

Producer mocks kafka's producer.

func (*Producer) Produce

func (p *Producer) Produce(topic string, partition int32, messages ...*proto.Message) (int64, error)

Produce is settings messages Crc and Offset attributes and pushing all passed arguments to broker. Produce call is blocking until pushed message will be read with broker's ReadProduces.

type Response

type Response interface {
	Bytes() ([]byte, error)
}

Response is any kafka response as defined in kafka/proto package

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is container for fake kafka server data.

Example
// symulate server latency for all fetch requests
delayFetch := func(nodeID int32, reqKind int16, content []byte) Response {
	if reqKind != proto.FetchReqKind {
		return nil
	}
	time.Sleep(time.Millisecond * 500)
	return nil
}

server := NewServer(delayFetch)
server.MustSpawn()
defer func() {
	_ = server.Close()
}()
fmt.Printf("running server: %s", server.Addr())

server.AddMessages("my-topic", 0,
	&proto.Message{Value: []byte("first")},
	&proto.Message{Value: []byte("second")})

// connect to server using broker and fetch/write messages
Output:

func NewServer

func NewServer(middlewares ...Middleware) *Server

NewServer return new mock server instance. Any number of middlewares can be passed to customize request handling. For every incomming request, all middlewares are called one after another in order they were passed. If any middleware return non nil response message, response is instasntly written to the client and no further code execution for the request is made -- no other middleware is called nor the default handler is executed.

func (*Server) AddMessages

func (s *Server) AddMessages(topic string, partition int32, messages ...*proto.Message)

AddMessages append messages to given topic/partition. If topic or partition does not exists, it is being created. To only create topic/partition, call this method withough giving any message.

func (*Server) Addr

func (s *Server) Addr() string

Addr return server instance address or empty string if not running.

func (*Server) Close

func (s *Server) Close() (err error)

Close shut down server if running. It is safe to call it more than once.

func (*Server) MustSpawn

func (s *Server) MustSpawn()

MustSpawn run server in the background on random port. It panics if server cannot be spawned. Use Close method to stop spawned server.

func (*Server) Reset

func (s *Server) Reset()

Reset will clear out local messages and topics.

func (*Server) ResetTopic

func (s *Server) ResetTopic(topic string)

ResetTopic removes all messages and committed offsets for a topic, but does not remove the topic or its partitions.

func (*Server) Run

func (s *Server) Run(addr string) error

Run starts kafka mock server listening on given address. Function only returns when the listener has exited.

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP provides JSON serialized server state information.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL