watermill

package
v0.0.0-...-75be81d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2024 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Agent

type Agent struct {
	SubscriberFactory SubscriberFactory
	Publisher         message.Publisher
}

func Kafka

func Kafka(
	brokers []string,
	subscriberConfigTemplate kafka.SubscriberConfig,
	publisherConfigTemplate kafka.PublisherConfig,
	logger *slog.Logger,
) (Agent, error)

func Nats

func Nats(
	url string,
	subscriberConfigTemplate nats.SubscriberConfig,
	publisherConfigTemplate nats.PublisherConfig,
	logger *slog.Logger,
) (Agent, error)

func Redis

func Redis(
	client redis.UniversalClient,
	subscriberConfigTemplate redisstream.SubscriberConfig,
	publisherConfigTemplate redisstream.PublisherConfig,
) (Agent, error)

type Broker

type Broker[Item any] struct {
	Agent                     Agent
	ItemCodec                 codec.Codec[Item, []byte]
	Logger                    *slog.Logger
	ConsumerChannelBufferSize int
}

func (Broker[Item]) Consume

func (b Broker[Item]) Consume(ctx context.Context, topics []string, settings broker.ConsumerSettings) (<-chan broker.Message[Item], error)

func (Broker[Item]) Publish

func (b Broker[Item]) Publish(_ context.Context, topic string, item Item) error

type FunctionalSubscriberFactory

type FunctionalSubscriberFactory func(
	ctx context.Context,
	settings broker.ConsumerSettings,
) (message.Subscriber, error)

func (FunctionalSubscriberFactory) New

type Message

type Message[Item any] struct {
	// contains filtered or unexported fields
}

func (Message[Item]) Ack

func (mes Message[Item]) Ack()

func (Message[Item]) Item

func (mes Message[Item]) Item() Item

func (Message[Item]) Nack

func (mes Message[Item]) Nack()

func (Message[Item]) Topic

func (mes Message[Item]) Topic() string

type SubscriberFactory

type SubscriberFactory interface {
	New(ctx context.Context, settings broker.ConsumerSettings) (message.Subscriber, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL