Documentation ¶
Index ¶
- type Config
- type Kamux
- func (kamux *Kamux) Cleanup(sarama.ConsumerGroupSession) (err error)
- func (kamux *Kamux) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error)
- func (kamux *Kamux) Launch() (err error)
- func (kamux *Kamux) Setup(sarama.ConsumerGroupSession) (err error)
- func (kamux *Kamux) Stop() error
- func (kamux *Kamux) StopWithError(err error) error
- type Logger
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // Brokers defines the list of kafka brokers to connect to. Brokers []string // User is the Kafka's user. User string // Password is the Kafka's password. Password string // Topics are all the topics on which consumer groups listen. Topics []string // ConsumerGroup is the name of the consumer group to use. ConsumerGroup string // The InitialOffset to use if no offset was previously committed. // Should be sarama.OffsetNewest or sarama.OffsetOldest. Defaults to sarama.OffsetNewest. InitialOffset int64 // Handler is the function executed on each kafka message. Handler func(*sarama.ConsumerMessage) error // ErrHandler is the function executed on Handler's error used to trying to rescue the error. ErrHandler func(error, *sarama.ConsumerMessage) error // PreRun is the function executed before the launch on processing. PreRun func(*Kamux) error // PostRun is the function executed on kamux close. PostRun func(*Kamux) error // StopOnError, whether or not to stop processing on handler error. StopOnError bool // MarkOffsets, whether or not to mark offsets on each message processing. MarkOffsets bool // Debug enables debug mode, more verbose output Debug bool // MessagesBufferSize is the buffer size of the messages that a worker can queue. MessagesBufferSize int // ForceKafkaVersion overrides kafka cluster version on sarama library. ForceKafkaVersion *sarama.KafkaVersion // Logger is used to print some Kamux's information. Golang's log package is used as default. Logger Logger }
A Config holds all the configuration of the Kamux class.
type Kamux ¶
type Kamux struct { Config *Config ConsumerConfig *sarama.Config // contains filtered or unexported fields }
Kamux is the main object for the Kamux
func NewKamux ¶
NewKamux is the constructor of the ConsumerProducer It will make some config checks, and prepare the kafka connections for the upcoming launch of the process
func (*Kamux) Cleanup ¶
func (kamux *Kamux) Cleanup(sarama.ConsumerGroupSession) (err error)
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exites but before the offsets are committed for the very last time.
func (*Kamux) ConsumeClaim ¶
func (kamux *Kamux) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) (err error)
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Once the Messages() channel is closed, the Handler must finish its processing loop and exit.
func (*Kamux) Launch ¶
Launch will begin the processing of the kafka messages It can be launched only once. It will :
- Connect to kafka using credentials provided in configuration
- Listen to consumer group notifications (rebalance,...)
- Listen to consumer errors, and stop properly in case of one
- Listen to system SIGINT to stop properly
func (*Kamux) Setup ¶
func (kamux *Kamux) Setup(sarama.ConsumerGroupSession) (err error)
Setup is run at the beginning of a new session, before ConsumeClaim.
func (*Kamux) StopWithError ¶
StopWithError will stop processing with error passed as argument
type Logger ¶
type Logger interface { Printf(format string, args ...interface{}) Println(args ...interface{}) Fatalf(format string, args ...interface{}) Fatal(args ...interface{}) Panicf(format string, args ...interface{}) Panic(args ...interface{}) }
A Logger is the interface used in this package for logging, so that any backend can be plugged in.