kafka

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2023 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	ClusterConfig                         *sarama.Config
	BrokerAddresses                       []string
	Topic                                 string
	GroupID                               string
	MaxInFlight                           int64
	CertPEMBlock, KeyPEMBlock, CaPEMCerts []byte
}

func (*Config) Connect

func (c *Config) Connect() (sarama.ConsumerGroup, error)

nolint

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(config *Config, logger *zerolog.Logger) (*Consumer, error)

NewConsumer sets up watermill subscriber and returns our consumer

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context, process func(messages <-chan *message.Message))

Start reads messages from subscriber and processes them with passed in function.

eg: for msg := range messages {
		fmt.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
		msg.Ack() }

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL