consumertest

package
v2.0.206+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2019 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package consumertest provides utilities for in-process unit testing of Gazette consumer applications.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateShards

func CreateShards(c *gc.C, cmr *Consumer, specs ...*consumer.ShardSpec)

CreateShards using the Consumer Apply API, and wait for them to be allocated.

func WaitForShards

func WaitForShards(ctx context.Context, rjc pb.RoutedJournalClient, conn *grpc.ClientConn, sel pb.LabelSelector) error

WaitForShards queries for shards matching LabelSelector |sel|, determines the current write-heads of journals being consumed by matched shards, and polls shards until each has caught up to the determined write-heads of its consumed journals.

Types

type Args

type Args struct {
	C        *gc.C
	Etcd     *clientv3.Client       // Etcd client instance.
	Journals pb.RoutedJournalClient // Broker client instance.
	App      consumer.Application   // Application of the consumer.
	Root     string                 // Consumer root in Etcd. Defaults to "/consumertest".
	Zone     string                 // Zone of the consumer. Defaults to "local".
	Suffix   string                 // ID Suffix of the consumer. Defaults to "consumer".
}

Args of NewConsumer.

type Consumer

type Consumer struct {
	// Server is a loopback Server created for this Consumer, which is available
	// for test applications to register APIs against.
	Server *server.Server
	// Service of the Consumer, which is available for test applications.
	Service *consumer.Service
	// contains filtered or unexported fields
}

Consumer is a lightweight, embedded Gazette consumer runtime suitable for in-process testing of consumer applications.

func NewConsumer

func NewConsumer(args Args) *Consumer

NewConsumer builds and returns a Consumer.

func (*Consumer) AllocateIdleCh

func (cmr *Consumer) AllocateIdleCh() <-chan struct{}

AllocateIdleCh signals when the Consumer's Allocate loop took an action, such as updating a shard assignment, and has since become idle. Tests must explicitly receive (and confirm as intended) signals sent on Allocator actions, or Consumer will panic.

func (*Consumer) RevokeLease

func (cmr *Consumer) RevokeLease(c *gc.C)

RevokeLease of the Consumer, allowing its Allocate loop to immediately exit.

func (*Consumer) Serve

func (cmr *Consumer) Serve(c *gc.C, ctx context.Context)

Serve the consumer by serving its Server loopback and invoking Allocate. Any APIs attached to the loopback Server should be registered before Serve is called. Serve returns once Allocate completes.

func (*Consumer) WaitForExit

func (cmr *Consumer) WaitForExit(c *gc.C)

WaitForExit of the Allocate loop, and complete Consumer teardown. WaitForExit will block indefinitely if the Consumer's Shard limit is not also zeroed (and another Consumer takes over), or its lease revoked.

func (*Consumer) ZeroShardLimit

func (cmr *Consumer) ZeroShardLimit(c *gc.C)

ZeroShardLimit of the Consumer. The test Consumer will eventually exit, assuming other Consumers(s) are available to take over the assignments.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL