v0.0.8 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2024 License: Apache-2.0 Imports: 8 Imported by: 0




This section is empty.


This section is empty.


This section is empty.


type Consumer

type Consumer[Output pubsub.MessageTransformer[KafkaMessage, Output]] struct {
	// contains filtered or unexported fields

func NewConsumer

func NewConsumer[Output pubsub.MessageTransformer[KafkaMessage, Output]](kafka PubSub, topic string) *Consumer[Output]

NewConsumer create a consumer to take kafka message and send them to the destination.

Output object must implement the MessageTransformer interface from KafkaMessage to themselves

func (*Consumer[Output]) Run

func (k *Consumer[Output]) Run(ctx context.Context, ch chan<- Output, filter channel.MessageFilter[Output]) error
type Header struct {
	Key   string
	Value []byte

type KafkaMessage

type KafkaMessage struct {
	Topic   string
	Payload []byte
	Headers []kgo.RecordHeader
	Done    chan error

type KafkaMessageFilter

type KafkaMessageFilter struct {
	Topic string

func (*KafkaMessageFilter) Filter

func (k *KafkaMessageFilter) Filter(message KafkaMessage) bool

type PubSub

type PubSub struct {
	// contains filtered or unexported fields

func New

func New(kafkaBroker string, consumerGroup string, blocking bool, topics ...string) PubSub

func NewBlockingPubSub

func NewBlockingPubSub(kafkaBroker string, consumerGroup string, topics ...string) PubSub

NewBlockingPubSub creates a new PubSub instance with blocking behavior

func NewPubSub

func NewPubSub(kafkaBroker string, consumerGroup string, topics ...string) PubSub

NewPubSub creates a new PubSub instance with non-blocking behavior

func (*PubSub) Listen

func (k *PubSub) Listen()

func (*PubSub) Produce

func (k *PubSub) Produce(topic string, value []byte, callback func(error), headers ...Header)

func (*PubSub) Subscribe

func (k *PubSub) Subscribe(topic string) (<-chan KafkaMessage, error)

func (*PubSub) Unsubscribe

func (k *PubSub) Unsubscribe(c <-chan KafkaMessage) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL