kamux

package module
v0.0.0-...-57973a1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2023 License: MIT Imports: 8 Imported by: 0

README

GoDoc Go Report Card

Kamux

Kamux is a simple library to interact with a Kafka cluster.

It can:

  • Connect with a kafka cluster (Only SASL/TLS with user/password)
  • Listen to one or multiple topics on a specific consumer group
  • Execute the function of your choice (handler)
  • Mark offset or not after handling
  • Listen to SIGINT and stop processing messages gracefully
  • Override sarama cluster consumer configuration before Launch()

A simple example :

func main() {

    config := &kamux.Config{
        Brokers:       []string{"kafka.broker1.net:9093", "kafka.broker2.net:9093"},
        User:          "myKafkaUser",
        Password:      "myPassword",
        Topics:        []string{"myTopic"},
        ConsumerGroup: "myConsumerGroup",
        Handler:       Handler,
        MarkOffsets:    true,
    }

    kamux, err := kamux.NewKamux(config)
    if err != nil {
        log.Fatalf("Fail to init kamux: %s", err)
    }

    kamux.Launch()
}

func Handler(sm *sarama.ConsumerMessage) ( err error ) {
    log.Printf("Received a message on topic : %s (partition: %d | offset: %d)", sm.Topic, sm.Partition, sm.Offset)
    return
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Brokers defines the list of kafka brokers to connect to.
	Brokers []string
	// User is the Kafka's user.
	User string
	// Password is the Kafka's password.
	Password string
	// Topics are all the topics on which consumer groups listen.
	Topics []string
	// ConsumerGroup is the name of the consumer group to use.
	ConsumerGroup string
	// The InitialOffset to use if no offset was previously committed.
	// Should be sarama.OffsetNewest or sarama.OffsetOldest. Defaults to sarama.OffsetNewest.
	InitialOffset int64
	// Handler is the function executed on each kafka message.
	Handler func(*sarama.ConsumerMessage) error
	// ErrHandler is the function executed on Handler's error used to trying to rescue the error.
	ErrHandler func(error, *sarama.ConsumerMessage) error
	// PreRun is the function executed before the launch on processing.
	PreRun func(*Kamux) error
	// PostRun is the function executed on kamux close.
	PostRun func(*Kamux) error
	// StopOnError, whether or not to stop processing on handler error.
	StopOnError bool
	// MarkOffsets, whether or not to mark offsets on each message processing.
	MarkOffsets bool
	// Debug enables debug mode, more verbose output
	Debug bool
	// MessagesBufferSize is the buffer size of the messages that a worker can queue.
	MessagesBufferSize int
	// ForceKafkaVersion overrides kafka cluster version on sarama library.
	ForceKafkaVersion *sarama.KafkaVersion
	// Logger is used to print some Kamux's information. Golang's log package is used as default.
	Logger Logger
}

A Config holds all the configuration of the Kamux class.

type Kamux

type Kamux struct {
	Config         *Config
	ConsumerConfig *sarama.Config
	// contains filtered or unexported fields
}

Kamux is the main object for the Kamux

func NewKamux

func NewKamux(config *Config) (kamux *Kamux, err error)

NewKamux is the constructor of the ConsumerProducer It will make some config checks, and prepare the kafka connections for the upcoming launch of the process

func (*Kamux) Cleanup

func (kamux *Kamux) Cleanup(sarama.ConsumerGroupSession) (err error)

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exites but before the offsets are committed for the very last time.

func (*Kamux) ConsumeClaim

func (kamux *Kamux) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error)

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.

func (*Kamux) Launch

func (kamux *Kamux) Launch() (err error)

Launch will begin the processing of the kafka messages It can be launched only once. It will :

  • Connect to kafka using credentials provided in configuration
  • Listen to consumer group notifications (rebalance,...)
  • Listen to consumer errors, and stop properly in case of one
  • Listen to system SIGINT to stop properly

func (*Kamux) Setup

func (kamux *Kamux) Setup(sarama.ConsumerGroupSession) (err error)

Setup is run at the beginning of a new session, before ConsumeClaim.

func (*Kamux) Stop

func (kamux *Kamux) Stop() error

Stop will stop processing with no error

func (*Kamux) StopWithError

func (kamux *Kamux) StopWithError(err error) error

StopWithError will stop processing with error passed as argument

type Logger

type Logger interface {
	Printf(format string, args ...interface{})
	Println(args ...interface{})

	Fatalf(format string, args ...interface{})
	Fatal(args ...interface{})

	Panicf(format string, args ...interface{})
	Panic(args ...interface{})
}

A Logger is the interface used in this package for logging, so that any backend can be plugged in.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL