Documentation ¶
Overview ¶
Package kafkatest provides mock objects for high level kafka interface.
Use NewBroker function to create mock broker object and standard methods to create producers and consumers.
Index ¶
- Variables
- func SetLogger(l *logging.Logger)
- type Broker
- func (b *Broker) Close()
- func (b *Broker) Consumer(conf kafka.ConsumerConf) (kafka.Consumer, error)
- func (b *Broker) OffsetCoordinator(conf kafka.OffsetCoordinatorConf) (kafka.OffsetCoordinator, error)
- func (b *Broker) OffsetEarliest(topic string, partition int32) (int64, error)
- func (b *Broker) OffsetLatest(topic string, partition int32) (int64, error)
- func (b *Broker) Producer(kafka.ProducerConf) kafka.Producer
- func (b *Broker) ReadProducers(timeout time.Duration) (*ProducedMessages, error)
- type Consumer
- type Middleware
- type OffsetCoordinator
- type ProducedMessages
- type Producer
- type Response
- type Server
- func (s *Server) AddMessages(topic string, partition int32, messages ...*proto.Message)
- func (s *Server) Addr() string
- func (s *Server) Close() (err error)
- func (s *Server) MustSpawn()
- func (s *Server) Reset()
- func (s *Server) ResetTopic(topic string)
- func (s *Server) Run(addr string) error
- func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrTimeout = errors.New("timeout") ErrNotImplemented = errors.New("not implemented") )
Functions ¶
Types ¶
type Broker ¶
type Broker struct { // OffsetEarliestHandler is callback function called whenever // OffsetEarliest method of the broker is called. Overwrite to change // default behaviour -- always returning ErrUnknownTopicOrPartition OffsetEarliestHandler func(string, int32) (int64, error) // OffsetLatestHandler is callback function called whenever OffsetLatest // method of the broker is called. Overwrite to change default behaviour -- // always returning ErrUnknownTopicOrPartition OffsetLatestHandler func(string, int32) (int64, error) // contains filtered or unexported fields }
Broker is mock version of kafka's broker. It's implementing Broker interface and provides easy way of mocking server actions.
func (*Broker) Close ¶
func (b *Broker) Close()
Close is no operation method, required by Broker interface.
func (*Broker) Consumer ¶
Consumer returns consumer mock and never error.
At most one consumer for every topic-partition pair can be created -- calling this for the same topic-partition will always return the same consumer instance.
Example ¶
broker := NewBroker() msg := &proto.Message{Value: []byte("first")} // mock server actions, pushing data through consumer go func() { consumer, _ := broker.Consumer(kafka.NewConsumerConf("my-topic", 0)) c := consumer.(*Consumer) // it is possible to send messages through consumer... c.Messages <- msg // every consumer fetch call is blocking untill there is either message // or error ready to return, this way we can test slow consumers time.Sleep(time.Millisecond * 20) // ...as well as push errors to mock failure c.Errors <- errors.New("expected error is expected") }() // test broker never fails creating consumer consumer, _ := broker.Consumer(kafka.NewConsumerConf("my-topic", 0)) m, err := consumer.Consume() if err == nil { fmt.Printf("Value: %q\n", m.Value) } if _, err = consumer.Consume(); err != nil { fmt.Printf("Error: %s\n", err) }
Output: Value: "first" Error: expected error is expected
func (*Broker) OffsetCoordinator ¶
func (b *Broker) OffsetCoordinator(conf kafka.OffsetCoordinatorConf) (kafka.OffsetCoordinator, error)
OffsetCoordinator returns offset coordinator mock instance. It's always successful, so you can always ignore returned error.
func (*Broker) OffsetEarliest ¶
OffsetEarliest return result of OffsetEarliestHandler callback set on the broker. If not set, always return ErrUnknownTopicOrPartition
func (*Broker) OffsetLatest ¶
OffsetLatest return result of OffsetLatestHandler callback set on the broker. If not set, always return ErrUnknownTopicOrPartition
func (*Broker) Producer ¶
func (b *Broker) Producer(kafka.ProducerConf) kafka.Producer
Producer returns producer mock instance.
Example ¶
broker := NewBroker() msg := &proto.Message{Value: []byte("first")} producer := broker.Producer(kafka.NewProducerConf()) // mock server actions, handling any produce call go func() { resp, err := broker.ReadProducers(time.Millisecond * 20) if err != nil { panic(fmt.Sprintf("failed reading producers: %s", err)) } if len(resp.Messages) != 1 { panic("expected single message") } if !reflect.DeepEqual(resp.Messages[0], msg) { panic("expected different message") } }() // provide data for above goroutine _, err := producer.Produce("my-topic", 0, msg) if err != nil { panic(fmt.Sprintf("cannot produce message: %s", err)) } mockProducer := producer.(*Producer) // test error handling by forcing producer to return error, // // it is possible to manipulate produce result by changing producer's // ResponseOffset and ResponseError attributes mockProducer.ResponseError = errors.New("my spoon is too big!") _, err = producer.Produce("my-topic", 0, msg) fmt.Printf("Error: %s\n", err)
Output: Error: my spoon is too big!
func (*Broker) ReadProducers ¶
func (b *Broker) ReadProducers(timeout time.Duration) (*ProducedMessages, error)
ReadProducers return ProduceMessages representing produce call of one of created by broker producers or ErrTimeout.
type Consumer ¶
type Consumer struct { Broker *Broker // Messages is channel consumed by fetch method call. Pushing message into // this channel will result in Consume method call returning message data. Messages chan *proto.Message // Errors is channel consumed by fetch method call. Pushing error into this // channel will result in Consume method call returning error. Errors chan error // contains filtered or unexported fields }
Consumer mocks kafka's consumer. Use Messages and Errors channels to mock Consume method results.
func (*Consumer) Consume ¶
Consume returns message or error pushed through consumers Messages and Errors channel. Function call will block until data on at least one of those channels is available.
func (*Consumer) SeekToLatest ¶
SeekToLatest discards all messages currently enqueued, unless an error is available first.
type Middleware ¶
Middleware is function that is called for every incomming kafka message, before running default processing handler. Middleware function can return nil or kafka response message.
type OffsetCoordinator ¶
type OffsetCoordinator struct { Broker *Broker // Offsets is used to store all offset commits when using mocked // coordinator's default behaviour. Offsets map[string]int64 // CommitHandler is callback function called whenever Commit method of the // OffsetCoordinator is called. If CommitHandler is nil, Commit method will // return data using Offset attribute as store. CommitHandler func(consumerGroup string, topic string, partition int32, offset int64) error // OffsetHandler is callback function called whenever Offset method of the // OffsetCoordinator is called. If OffsetHandler is nil, Commit method will // use Offset attribute to retrieve the offset. OffsetHandler func(consumerGroup string, topic string, partition int32) (offset int64, metadata string, err error) // contains filtered or unexported fields }
func (*OffsetCoordinator) Close ¶
func (c *OffsetCoordinator) Close()
func (*OffsetCoordinator) Commit ¶
func (c *OffsetCoordinator) Commit(topic string, partition int32, offset int64) error
Commit return result of CommitHandler callback set on coordinator. If handler is nil, this method will use Offsets attribute to store data for further use.
func (*OffsetCoordinator) Offset ¶
func (c *OffsetCoordinator) Offset(topic string, partition int32) (offset int64, metadata string, err error)
Offset return result of OffsetHandler callback set on coordinator. If handler is nil, this method will use Offsets attribute to retrieve committed offset. If no offset for given topic and partition pair was saved, proto.ErrUnknownTopicOrPartition is returned.
type ProducedMessages ¶
ProducedMessages represents all arguments used for single Produce method call.
type Producer ¶
type Producer struct { Broker *Broker // ResponseOffset is offset counter returned and incremented by every // Produce method call. By default set to 1. ResponseOffset int64 // ResponseError if set, force Produce method call to instantly return // error, without publishing messages. By default nil. ResponseError error }
Producer mocks kafka's producer.
func (*Producer) Produce ¶
func (p *Producer) Produce(topic string, partition int32, messages ...*proto.Message) (int64, error)
Produce is settings messages Crc and Offset attributes and pushing all passed arguments to broker. Produce call is blocking until pushed message will be read with broker's ReadProduces.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is container for fake kafka server data.
Example ¶
// symulate server latency for all fetch requests delayFetch := func(nodeID int32, reqKind int16, content []byte) Response { if reqKind != proto.FetchReqKind { return nil } time.Sleep(time.Millisecond * 500) return nil } server := NewServer(delayFetch) server.MustSpawn() defer func() { _ = server.Close() }() fmt.Printf("running server: %s", server.Addr()) server.AddMessages("my-topic", 0, &proto.Message{Value: []byte("first")}, &proto.Message{Value: []byte("second")}) // connect to server using broker and fetch/write messages
Output:
func NewServer ¶
func NewServer(middlewares ...Middleware) *Server
NewServer return new mock server instance. Any number of middlewares can be passed to customize request handling. For every incomming request, all middlewares are called one after another in order they were passed. If any middleware return non nil response message, response is instasntly written to the client and no further code execution for the request is made -- no other middleware is called nor the default handler is executed.
func (*Server) AddMessages ¶
AddMessages append messages to given topic/partition. If topic or partition does not exists, it is being created. To only create topic/partition, call this method withough giving any message.
func (*Server) MustSpawn ¶
func (s *Server) MustSpawn()
MustSpawn run server in the background on random port. It panics if server cannot be spawned. Use Close method to stop spawned server.
func (*Server) ResetTopic ¶
ResetTopic removes all messages and committed offsets for a topic, but does not remove the topic or its partitions.