kafkatest

package
v0.0.54 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2024 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type TestClient

type TestClient[EventType any] struct {
	// Topic is the name of the created test topic
	Topic string
	// contains filtered or unexported fields
}

TestClient is a Kafka client that allows you to easily setup up a Kafka topic and interact with it for testing.

TestClient client accepts a type parameter EventType, which is the raw type of the event being published/consumed to and from the topic. It is used to within TestClient to simplify encoding and decoding processes.

func NewTestClient

func NewTestClient[EventType any](t *testing.T, ctx context.Context, cfg TestClientConfig) *TestClient[EventType]

NewTestClient creates a new test topic in Kafka and returns a TestClient.

func (*TestClient[EventType]) Client

func (c *TestClient[EventType]) Client() *kafka.Client

Client returns the internal kafka client.

func (*TestClient[EventType]) ConsumeEvent

func (c *TestClient[EventType]) ConsumeEvent(t *testing.T, ctx context.Context) EventType

ConsumeEvent reads the next message from the topic and commits the offset. The message is decoded into a new declaration of EventType and returned.

func (*TestClient[EventType]) PublishEvents

func (c *TestClient[EventType]) PublishEvents(t *testing.T, ctx context.Context, events ...EventType)

PublishEvents generates kafka messages from the passed in raw event types and publishes them to the test topic.

func (*TestClient[EventType]) PublishMessages

func (c *TestClient[EventType]) PublishMessages(t *testing.T, ctx context.Context, msgs ...kafka.Message)

PublishMessages publishes the passed in kafka messages to the test topic.

func (*TestClient[EventType]) Reader

func (c *TestClient[EventType]) Reader() *kafka.Reader

Reader returns the internal kafka reader.

func (*TestClient[EventType]) Registry

func (c *TestClient[EventType]) Registry() *TestRegistry[EventType]

Registry returns the internal kafka test registry.

func (*TestClient[EventType]) TopicMessageCount

func (c *TestClient[EventType]) TopicMessageCount(t *testing.T, ctx context.Context) int64

TopicMessageCount returns the total number of messages that are currently in the test topic.

func (*TestClient[EventType]) Writer

func (c *TestClient[EventType]) Writer() *kafka.Writer

Writer returns the internal kafka writer.

type TestClientConfig

type TestClientConfig struct {
	KafkaBrokerHostPort    string
	SchemaRegistryHostPort string
	NumTopicPartitions     int
}

TestClientConfig is a configuration object used to create a new TestClient.

type TestRegistry

type TestRegistry[EventType any] struct {
	// contains filtered or unexported fields
}

TestRegistry is a kafka registry used to easily encode/decode messages for testing.

TestRegistry client accepts a type parameter EventType, which is the raw type of the event to encode and decode into.

func NewTestRegistry

func NewTestRegistry[EventType any](t *testing.T, ctx context.Context, hostPort string, subject string) *TestRegistry[EventType]

NewTestRegistry returns a new TestRegistry. During creation the schema for EventType is automatically generated and registered under a test subject.

func (*TestRegistry[EventType]) Decode

func (r *TestRegistry[EventType]) Decode(t *testing.T, ctx context.Context, data []byte) EventType

Decode unmarshals the given message into a new declaration of EventType and returns it.

func (*TestRegistry[EventType]) Encode

func (r *TestRegistry[EventType]) Encode(t *testing.T, ctx context.Context, event EventType) []byte

Encode returns the event marshaled using the Avro binary encoding.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL