kp

package module
v1.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: Apache-2.0 Imports: 14 Imported by: 1

README

kp

Library for handing Kafka messages with retries

How it works

Each KP instance create a kafka sarama client which will listen to 2 topics: main topic and retry topic. The Retry topic will be prepended with the consumer group name such that each instance could have its own retry and dead letter topics. This will enable a case such that you have two services that listen to the same topic but one service might fail the topic, with this it would only retry on that service and not the other one.

Code Example: (see examples)

func main() {
	cfg, err := config.LoadConfig()
	if err != nil {
		panic(err)
	}
	retryCount := 10
	consumerGroup := "simple-service" // usually the name of your service
	backoffDuration := time.Second * 1 // set to 0 for no backoff

	processor := kp.NewKafkaProcessor("main-topic", "retry-main-topic", "dead-main-topic", retryCount, consumerGroup, kp.KafkaConfig{KafkaBootstrapServers: strings.Split(cfg.KafkaConfig.KafkaBootstrapServers, ",")}, backoffDuration)
	// retry topic becomes "simple-service-retry-main-topic"
	// dead letter topic becomes "simple-service-dead-main-topic"
	processor.Process(func(key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error {
		err := processMessage(message)
		if err != nil {
			log.Printf("Error processing message: %s", err)
			return err
		}
		log.Println("successfully processed message")
		return nil
	})

	processor.Start()

}

To disable retries, just set retries to 0

Exponential backoff

This library supports exponential backoff. To use a backoff policy, set the backoffDuration to anything above 1. To disable the backoff policy, set the backoffDuration to 0. Backoff happens on the whole client which will slow down all mesaages. When a message is successful, the backoffDuration is reduced.

Topics

KP Uses 3 topics: main topic, retry topic and dead letter topic. You are able to modify the retry and dead letter topics like so:


processor := kp.NewKafkaProcessor(
	cfg.Kafka.CardReplacementTopic,
	"retry-"+cfg.Kafka.CardReplacementTopic,
	"deadletter-"+cfg.Kafka.CardReplacementTopic,
	10,
	"card-delivery-service",
	kp.KafkaConfig{KafkaBootstrapServers: strings.Split(cfg.Kafka.KafkaBootstrap, ",")},
	time.Second*5,
	)

This is useful when you have a worker that listens to multiple topics and each topic needs to have its own retry and dead letter topic.

For retry and deadletter topics they are generated with the consumer group. ex:

  topic: test
  retrytopic: retry
  deadlettertopic: deadletter
  consumergroup: group

resulting topics:
    retrytopic: group-retry
    deadlettertopic: group-deadletter


resulting topic format: <consumer group>-<retry|deadletter topic>

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateConsumerSession

func CreateConsumerSession(k *KP, wg *sync.WaitGroup, ctx context.Context) error

func MarshalStringMessage

func MarshalStringMessage(message string, retries int) string

func UnmarshalStringMessage

func UnmarshalStringMessage(message string) (string, int, error)

Types

type Config

type Config struct {
	KafkaConfig KafkaConfig `json:"kafka"`
}

type ConsumerStruct

type ConsumerStruct struct {
	Processor func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error
	// contains filtered or unexported fields
}

ConsumerStruct represents a Sarama consumer group consumer

func (*ConsumerStruct) Cleanup

func (consumer *ConsumerStruct) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*ConsumerStruct) ConsumeClaim

func (consumer *ConsumerStruct) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*ConsumerStruct) GetReady

func (Consumer *ConsumerStruct) GetReady() chan bool

func (*ConsumerStruct) Process

func (consumer *ConsumerStruct) Process(ctx context.Context, message *sarama.ConsumerMessage) error

func (*ConsumerStruct) SetReady

func (consumer *ConsumerStruct) SetReady(ready chan bool)

func (*ConsumerStruct) Setup

type KP

type KP struct {
	// contains filtered or unexported fields
}

func (*KP) OnFailure

func (k *KP) OnFailure(failure func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error)

func (*KP) Process

func (k *KP) Process(processor func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error)

func (*KP) Start

func (k *KP) Start(ctx context.Context) error

func (*KP) Stop

func (k *KP) Stop()

type KPConsumer

type KPConsumer interface {
	Setup(sarama.ConsumerGroupSession) error
	Cleanup(sarama.ConsumerGroupSession) error
	ConsumeClaim(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) error
	GetReady() chan bool
	SetReady(chan bool)
	Process(ctx context.Context, message *sarama.ConsumerMessage) error
}

func NewConsumer

func NewConsumer(topic string, retryTopic string, deadLetterTopic string, retries int, processor func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error, onFailure *func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error, producer KPProducer, backoffPolicyTime time.Duration) KPConsumer

type KPProducer

type KPProducer interface {
	GetProducer(kafkaConfig KafkaConfig) *Producer
	ProduceMessage(ctx context.Context, topic string, key string, message string) error
}

func NewProducer

func NewProducer(kafkaConfig KafkaConfig) KPProducer

type KafkaConfig

type KafkaConfig struct {
	KafkaBootstrapServers []string `json:"kafka_bootstrap"`
}

type KafkaProcessor

type KafkaProcessor interface {
	Process(processor func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error)
	Start(ctx context.Context) error
	Stop()
	OnFailure(failure func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error)
}

func NewKafkaProcessor

func NewKafkaProcessor(topic string, retryTopic string, deadLetterTopic string, retries int, consumerGroup string, kafkaConfig KafkaConfig, backoffDuration time.Duration) KafkaProcessor

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func GetProducer

func GetProducer(kafkaConfig KafkaConfig) *Producer

func (*Producer) GetProducer

func (p *Producer) GetProducer(kafkaConfig KafkaConfig) *Producer

func (*Producer) ProduceMessage

func (p *Producer) ProduceMessage(ctx context.Context, topic string, key string, message string) error

Directories

Path Synopsis
examples
simple Module
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL