broker

package
v0.0.0-...-75be81d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2024 License: MIT Imports: 1 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Process

func Process[Item any](ctx context.Context, message Message[Item], handler HandlerFunc[Item]) error

Types

type Broker

type Broker[Item any] interface {
	Consumer[Item]
	Publisher[Item]
}

type Consumer

type Consumer[Item any] interface {
	Consume(ctx context.Context, topics []string, settings ConsumerSettings) (<-chan Message[Item], error)
}

type ConsumerInitializationPolicy

type ConsumerInitializationPolicy string
var (
	OldestOffset ConsumerInitializationPolicy = "oldest"
	NewestOffset ConsumerInitializationPolicy = "newest"
)

type ConsumerSettings

type ConsumerSettings struct {
	Group                string
	InitializationPolicy ConsumerInitializationPolicy
}

type HandlerFunc

type HandlerFunc[Item any] func(ctx context.Context, ev Item) error

type Message

type Message[Item any] interface {
	Item() Item
	Ack()
	Nack()
}

type Processor

type Processor[Item any] struct {
	Handler      HandlerFunc[Item]
	ErrorHandler func(ctx context.Context, ev Item, err error)
}

func (Processor[Item]) Process

func (processor Processor[Item]) Process(ctx context.Context, channel <-chan Message[Item])

type Publisher

type Publisher[Item any] interface {
	Publish(ctx context.Context, topic string, item Item) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL