Documentation ¶
Index ¶
- type TestClient
- func (c *TestClient[EventType]) Client() *kafka.Client
- func (c *TestClient[EventType]) ConsumeEvent(t *testing.T, ctx context.Context) EventType
- func (c *TestClient[EventType]) PublishEvents(t *testing.T, ctx context.Context, events ...EventType)
- func (c *TestClient[EventType]) PublishMessages(t *testing.T, ctx context.Context, msgs ...kafka.Message)
- func (c *TestClient[EventType]) Reader() *kafka.Reader
- func (c *TestClient[EventType]) Registry() *TestRegistry[EventType]
- func (c *TestClient[EventType]) TopicMessageCount(t *testing.T, ctx context.Context) int64
- func (c *TestClient[EventType]) Writer() *kafka.Writer
- type TestClientConfig
- type TestRegistry
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type TestClient ¶
type TestClient[EventType any] struct { // Topic is the name of the created test topic Topic string // contains filtered or unexported fields }
TestClient is a Kafka client that allows you to easily setup up a Kafka topic and interact with it for testing.
TestClient client accepts a type parameter EventType, which is the raw type of the event being published/consumed to and from the topic. It is used to within TestClient to simplify encoding and decoding processes.
func NewTestClient ¶
func NewTestClient[EventType any](t *testing.T, ctx context.Context, cfg TestClientConfig) *TestClient[EventType]
NewTestClient creates a new test topic in Kafka and returns a TestClient.
func (*TestClient[EventType]) Client ¶
func (c *TestClient[EventType]) Client() *kafka.Client
Client returns the internal kafka client.
func (*TestClient[EventType]) ConsumeEvent ¶
func (c *TestClient[EventType]) ConsumeEvent(t *testing.T, ctx context.Context) EventType
ConsumeEvent reads the next message from the topic and commits the offset. The message is decoded into a new declaration of EventType and returned.
func (*TestClient[EventType]) PublishEvents ¶
func (c *TestClient[EventType]) PublishEvents(t *testing.T, ctx context.Context, events ...EventType)
PublishEvents generates kafka messages from the passed in raw event types and publishes them to the test topic.
func (*TestClient[EventType]) PublishMessages ¶
func (c *TestClient[EventType]) PublishMessages(t *testing.T, ctx context.Context, msgs ...kafka.Message)
PublishMessages publishes the passed in kafka messages to the test topic.
func (*TestClient[EventType]) Reader ¶
func (c *TestClient[EventType]) Reader() *kafka.Reader
Reader returns the internal kafka reader.
func (*TestClient[EventType]) Registry ¶
func (c *TestClient[EventType]) Registry() *TestRegistry[EventType]
Registry returns the internal kafka test registry.
func (*TestClient[EventType]) TopicMessageCount ¶
TopicMessageCount returns the total number of messages that are currently in the test topic.
func (*TestClient[EventType]) Writer ¶
func (c *TestClient[EventType]) Writer() *kafka.Writer
Writer returns the internal kafka writer.
type TestClientConfig ¶
type TestClientConfig struct { KafkaBrokerHostPort string SchemaRegistryHostPort string NumTopicPartitions int }
TestClientConfig is a configuration object used to create a new TestClient.
type TestRegistry ¶
type TestRegistry[EventType any] struct { // contains filtered or unexported fields }
TestRegistry is a kafka registry used to easily encode/decode messages for testing.
TestRegistry client accepts a type parameter EventType, which is the raw type of the event to encode and decode into.
func NewTestRegistry ¶
func NewTestRegistry[EventType any](t *testing.T, ctx context.Context, hostPort string, subject string) *TestRegistry[EventType]
NewTestRegistry returns a new TestRegistry. During creation the schema for EventType is automatically generated and registered under a test subject.