gracesarama

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2021 License: MIT Imports: 4 Imported by: 1

README

Go Report Card PkgGoDev GitHub License GitHub tag (latest by date)

Grace Sarama Runners

Sarama runners for use with grace.

Consumer Usage

package main

import (
	"context"
	"fmt"
	"github.com/tomwright/grace"
	"github.com/tomwright/gracesarama"
	"github.com/Shopify/sarama"
	"log"
)

func main() {
	g := grace.Init(context.Background())

	config := sarama.NewConfig()
	// Set kafka version.
	config.Version = sarama.V2_1_0_0

	runner := gracesarama.NewConsumerGroupRunner(
		[]string{"localhost:9092"},
		"my-group",
		config,
		[]string{"topic-a", "topic-b"},
		&exampleConsumerGroupHandler{},
	)
	runner.ErrorHandlerFn = func(err error) {
		log.Printf("ERROR: %s\n", err.Error())
	}

	g.Run(runner)

	g.Wait()
}

type exampleConsumerGroupHandler struct{}

func (exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
		sess.MarkMessage(msg, "")
	}
	return nil
}

Producer Usage

package main

import (
	"context"
	"github.com/tomwright/grace"
	"github.com/tomwright/gracesarama"
	"github.com/Shopify/sarama"
	"log"
)

func main() {
	g := grace.Init(context.Background())

	config := sarama.NewConfig()
	// Set kafka version.
	config.Version = sarama.V2_1_0_0
	config.Producer.Return.Errors = true
	config.Producer.Return.Successes = true

	runner := gracesarama.NewProducerRunner([]string{"localhost:9092"}, config)
	runner.SuccessHandlerFn = func(message *sarama.ProducerMessage) {
		log.Printf("message published to topic: %s\n", message.Topic)
	}
	runner.ErrorHandlerFn = func(err *sarama.ProducerError) {
		log.Printf("failed to publish to topic: %s: %s\n", err.Msg.Topic, err.Err.Error())
	}

	produceCh := runner.Input()

	g.Run(runner)

	go func() {
		produceCh <- &sarama.ProducerMessage{
			// ...
		}
	}()

	g.Wait()
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerGroupRunner

type ConsumerGroupRunner struct {

	// ErrorHandlerFn handles any errors found while consuming.
	// If it is nil errors are ignored.
	ErrorHandlerFn func(err error)

	// LogFn is used to log any debug/info messages from the runner.
	// Leave as nil if you do not want any log messages.
	LogFn func(format string, a ...interface{})
	// contains filtered or unexported fields
}

ConsumerGroupRunner is used to run and gracefully shutdown a consumer group.

func NewConsumerGroupRunner

func NewConsumerGroupRunner(
	addrs []string, groupID string, config *sarama.Config,
	topics []string,
	handler sarama.ConsumerGroupHandler,
) *ConsumerGroupRunner

NewConsumerGroupRunner returns a runner that controls the start-up and graceful shutdown of a consumer group.

func (*ConsumerGroupRunner) Run

func (cgr *ConsumerGroupRunner) Run(ctx context.Context) error

Run starts up the consumer group.

type ProducerRunner

type ProducerRunner struct {

	// ErrorHandlerFn is used to handle errors when producing messages.
	// If this is null, errors are ignored.
	ErrorHandlerFn func(err *sarama.ProducerError)
	// SuccessHandlerFn is used to handle successfully produced messages.
	// If this is null, it is ignored.
	SuccessHandlerFn func(message *sarama.ProducerMessage)
	// contains filtered or unexported fields
}

ProducerRunner is used publish messages async through channels.

func NewProducerRunner

func NewProducerRunner(addrs []string, config *sarama.Config) *ProducerRunner

NewProducerRunner returns a producer runner that can publish messages async through channels.

func (*ProducerRunner) Input

func (cgr *ProducerRunner) Input() chan<- *sarama.ProducerMessage

func (*ProducerRunner) Run

func (cgr *ProducerRunner) Run(ctx context.Context) error

Run starts up the producer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL